记录下如何基于NumPy将npz
文件解析成多个csv
文件,以自己项目中使用的npz
文件结构为例,其它地方使用时可能会有差异
执行下述代码分析其key结构
1
2
3
4
5
|
if __name__ == '__main__':
src_file = r'D:\test-001.npz'
results = np.load(src_file)
for key, value in results.items():
print(key)
|
输出结果类似如下
1
2
3
4
5
6
|
CAN/2/localmap1__message394/left_line_id_11
CAN/2/localmap1__message394/left_line_id_12
CAN/2/localmap1__message394/left_line_id_13
CAN/2/localmap1__message396/_timestamp
CAN/2/localmap1__message396/timeStamp
CAN/2/localmap1__message396/right_line_id_15
|
基于此可发现其组织结构为协议
/通道
/序号
/信号
/报文
,我们要做的时基于以信号为单位生成csv
文件
改进测试代码
1
2
3
4
5
|
if __name__ == '__main__':
src_file = r'D:\test-001.npz'
results = np.load(src_file)
for key, value in results.items():
print(key + "\t" + str(len(value)))
|
执行后输出结果类似如下
1
2
3
4
5
6
|
CAN/2/localmap1__message394/left_line_id_11 322
CAN/2/localmap1__message394/left_line_id_12 322
CAN/2/localmap1__message394/left_line_id_13 322
CAN/2/localmap1__message396/_timestamp 350
CAN/2/localmap1__message396/timeStamp 350
CAN/2/localmap1__message396/right_line_id_15 350
|
可发现一个报文对应多行记录,且每个信号下的报文记录数基本上相同。
同时也能发现在npz
中的数据是一行一行的,逐行拼接在一起,若要转化为csv
文件,则涉及到行专列操作,同样可基于NumPy
进行操作,完整的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import os
import shutil
import numpy as np
import time
import sys
def parse_npz(data_folder, src_file):
if os.path.exists(data_folder):
shutil.rmtree(data_folder)
os.makedirs(data_folder)
results = np.load(src_file)
dict = {}
message_name = None
for key, value in results.items():
data = key.split("/")
# 长度不符合要求,直接跳过
if (len(data)) != 4:
continue
new_message = data[-2]
# 新的信号开始,要处理之前的信号
if new_message != message_name:
process_message(data_folder, message_name, dict)
message_name = new_message
dict[message_name] = []
signal = str(data[-1])
is_timestamp = signal == '_timestamp'
if is_timestamp:
value = np.multiply(value, 100_0000).astype(np.int64)
value = value - value[0]
signal = 'Timestamp'
dict[message_name].append([signal, *value])
# 处理最后一个信号
process_message(data_folder, message_name, dict)
def process_message(folder, message, dict):
if message is None:
return
signals = dict[message]
length = min(len(v) for v in signals)
signals = [v[0:length] for v in signals]
# 将行转化为列,以符合业务需求
csv_data = np.array(signals).swapaxes(0, 1)
# 保存到磁盘
file_name = folder + os.sep + message + ".csv"
np.savetxt(file_name, csv_data, delimiter=",", fmt="%s")
# print('-------------save file to ' + file_name + '---------------')
del dict[message]
if __name__ == '__main__':
#data_folder = sys.argv[1]
#src_file = sys.argv[2]
data_folder = r'D:\npz_csv'
src_file = r'D:\test-001.npz'
start = time.time()
parse_npz(data_folder, src_file)
end = time.time()
print(f'=============== Parse npz file time cost:{end - start:.4f}s =============')
|