Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sunprinceS/768b67b73bcd284f5bba60073494e7ac to your computer and use it in GitHub Desktop.
Save sunprinceS/768b67b73bcd284f5bba60073494e7ac to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from pathlib import Path
import json
import time
import re
from numpy.lib.format import open_memmap
import numpy as np
import kaldiio
import pickle
from shutil import copyfile
from collections import Counter
OUTPATH='mydata-separate'
SPLIT_BY_ACCENT=False
LANG='en'
ACCENT_LIST=['african','australia','bermuda','canada','england','hongkong','indian','ireland','malaysia','newzealand', 'scotland', 'philippines', 'singapore', 'southatlandtic', 'us', 'wales']
# ACCENT_LIST=['us']
if __name__ == "__main__":
cur_path = Path.cwd()
out_path = cur_path.joinpath(OUTPATH)
out_path.mkdir(exist_ok=True)
dir_path = cur_path.joinpath('data','lang_char')
for d in ['valid_train_en_unigram150.model','valid_train_en_unigram150_units.txt']:
copyfile(dir_path.joinpath(d),out_path.joinpath(d))
utt2accent = {}
with open(cur_path.joinpath('data', 'validated_en', 'utt2accent')) as fin:
for line in fin.readlines():
utt, accent = line.rstrip().split(' ')
if accent != 'NULL' and accent != 'other':
utt2accent[utt] = accent
trav_ls = [(d.name,d.joinpath('deltafalse','data_unigram150.json')) for d in cur_path.joinpath('dump').iterdir() if d.is_dir()]
meta = dict()
first = True
regex = re.compile('[^a-zA-Z]'+LANG)
for name, jsf in trav_ls:
t = time.time()
print(f"Process {name}...")
trim_name = regex.sub('',name)[6:]
p = out_path.joinpath(trim_name)
p.mkdir(exist_ok=True)
with open(jsf, 'rb') as f:
data = json.load(f)['utts']
utts = list(data.keys())
idim = int(data[utts[0]]['input'][0]['shape'][-1])
odim = int(data[utts[0]]['output'][0]['shape'][-1])
if not first:
assert idim == meta['idim'] and odim == meta['odim'], "Train/Dev/Eval should have same input/output dimension"
else:
meta['idim'] = idim
meta['odim'] = odim
first = False
#sorted by input length
sorted_data = sorted(data.items(), key=lambda d: d[1]['input'][0]['shape'][0])
# sorted_data = sorted(data.items(), key=lambda d: d[1]['input'][0]['shape'][0], reverse=True)
utt2feat_ls = {accent: list() for accent in ACCENT_LIST}
utt2ilen_ls = {accent: list() for accent in ACCENT_LIST}
utt2label_ls = {accent: list() for accent in ACCENT_LIST}
utt2olen_ls = {accent: list() for accent in ACCENT_LIST}
print("Read json file")
for d in sorted_data:
utt_name = d[0]
if utt_name in utt2accent:
accent = utt2accent[utt_name]
feat_path = '/'.join(d[1]['input'][0]['feat'].split('/')[7:])
# print('/'.join(feat_path.split('/')[7:]))
feat = kaldiio.load_mat(feat_path) #np array
ilen = d[1]['input'][0]['shape'][0] #int
label = list(map(int,d[1]['output'][0]['tokenid'].split(' ')))
olen = d[1]['output'][0]['shape'][0] #int
assert len(label) == olen, "Label length != olen !!?"
if olen == 0:
print(f"{utt_name} is 0-length")
continue
utt2feat_ls[accent].append(feat)
utt2ilen_ls[accent].append(ilen)
utt2label_ls[accent].append(label)
utt2olen_ls[accent].append(olen)
print("Reading takes: {:.3f}".format(time.time() - t))
t = time.time()
for accent in ACCENT_LIST:
print(f"=== {accent} ===")
final_path = p.joinpath(accent)
final_path.mkdir(exist_ok=True)
feat_mat = np.vstack(utt2feat_ls[accent]).astype('float32')
del utt2feat_ls[accent]
ilen_arr = np.array(utt2ilen_ls[accent]).astype('int')
label_arr = np.concatenate(utt2label_ls[accent]).astype('int')
olen_arr = np.array(utt2olen_ls[accent]).astype('int')
feat_mat_data = open_memmap(final_path.joinpath('feat.dat'), mode='w+', dtype='float32', shape=feat_mat.shape)
print("Reading feat_mat finished")
feat_mat_data[:] = feat_mat[:]
del feat_mat
feat_mat_data.flush()
del feat_mat_data
# np.save(p.joinpath('feat.npy'), feat_mat) # Not provide this
np.save(final_path.joinpath('ilens.npy'), ilen_arr)
np.save(final_path.joinpath('label.npy'), label_arr)
np.save(final_path.joinpath('olens.npy'), olen_arr)
print("Writing takes: {:.3f}".format(time.time() - t))
i_cnt = Counter(utt2ilen_ls[accent])
# print(i_cnt)
o_cnt = Counter(utt2olen_ls[accent])
with open(p.joinpath('ilen-cnt.pkl'),'wb') as fin:
pickle.dump(i_cnt,fin)
with open(p.joinpath('olen-cnt.pkl'),'wb') as fin:
pickle.dump(o_cnt,fin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment