-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataset_processor.py
More file actions
92 lines (87 loc) · 3.51 KB
/
dataset_processor.py
File metadata and controls
92 lines (87 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
import json
import pandas as pd
import threading
import shutil
from utils import get_log_file
def extract_fields(data, field_map):
return [{new: item.get(old, None) for old, new in field_map.items()} for item in data]
def convert_format(input_path, output_path, out_format):
ext = os.path.splitext(input_path)[1]
if ext == '.json':
with open(input_path, 'r', encoding='utf-8') as f:
data = json.load(f)
elif ext == '.jsonl':
with open(input_path, 'r', encoding='utf-8') as f:
data = [json.loads(line) for line in f]
elif ext == '.csv':
data = pd.read_csv(input_path).to_dict(orient='records')
else:
raise ValueError('不支持的输入格式')
if out_format == 'json':
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
elif out_format == 'jsonl':
with open(output_path, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
elif out_format == 'csv':
pd.DataFrame(data).to_csv(output_path, index=False)
else:
raise ValueError('不支持的输出格式')
def merge_datasets(paths, output_path):
all_data = []
for p in paths:
ext = os.path.splitext(p)[1]
if ext == '.json':
with open(p, 'r', encoding='utf-8') as f:
all_data.extend(json.load(f))
elif ext == '.jsonl':
with open(p, 'r', encoding='utf-8') as f:
all_data.extend([json.loads(line) for line in f])
elif ext == '.csv':
all_data.extend(pd.read_csv(p).to_dict(orient='records'))
ext = os.path.splitext(output_path)[1]
if ext == '.json':
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(all_data, f, ensure_ascii=False, indent=2)
elif ext == '.jsonl':
with open(output_path, 'w', encoding='utf-8') as f:
for item in all_data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
elif ext == '.csv':
pd.DataFrame(all_data).to_csv(output_path, index=False)
def append_dataset(src_path, dst_path):
ext = os.path.splitext(src_path)[1]
if ext == '.json':
with open(src_path, 'r', encoding='utf-8') as f:
src_data = json.load(f)
elif ext == '.jsonl':
with open(src_path, 'r', encoding='utf-8') as f:
src_data = [json.loads(line) for line in f]
elif ext == '.csv':
src_data = pd.read_csv(src_path).to_dict(orient='records')
else:
raise ValueError('不支持的输入格式')
ext2 = os.path.splitext(dst_path)[1]
if ext2 == '.json':
with open(dst_path, 'r+', encoding='utf-8') as f:
dst_data = json.load(f)
dst_data.extend(src_data)
f.seek(0)
json.dump(dst_data, f, ensure_ascii=False, indent=2)
elif ext2 == '.jsonl':
with open(dst_path, 'a', encoding='utf-8') as f:
for item in src_data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
elif ext2 == '.csv':
df = pd.read_csv(dst_path)
df2 = pd.DataFrame(src_data)
df2.to_csv(dst_path, mode='a', header=False, index=False)
def clean_temp(temp_dir, keep_hours=24):
now = os.path.getmtime
for d in os.listdir(temp_dir):
dpath = os.path.join(temp_dir, d)
if os.path.isdir(dpath):
if (now(dpath) + keep_hours*3600) < now(temp_dir):
shutil.rmtree(dpath)