readzip_minute_data多进程处理数据 [操作系统入门]

编程

#!/usr/bin/env python

import os

import numpy as np

import py7zr

import shutil

import pandas as pd

import time

import multiprocessing

import re

def fun_time_l2(a,b):

if float(a)<=float(b) :

return 1

else:

return 0

def read_files(filename):#读文件内容

df1 = pd.DataFrame()

with open(filename, "r") as f:

listT = []

for line in f:

listT.append(line)

df1 = pd.DataFrame(listT)

index = df1.loc[(df1[0].str.contains("find"))].index

if index.isnull:

df1 = df1.drop(index=index)

df1 = pd.DataFrame(df1[0].str.strip())

df1 = pd.DataFrame(df1[0].str.split(" ", expand=True))

df1[3] = df1[1].astype("int") * df1[2].astype("int")

df1.columns = ["time", "price", "vol", "amount"]

vol_t = abs(df1["vol"].astype("long")).sum()

amount_t = abs(df1["amount"].astype("long")).sum()

df_f_xiao = df1[(df1["amount"].astype("int") < 0) & ((df1["amount"].astype("int") > -40000))]

df_f_zhong = df1[(df1["amount"].astype("int") <= -40000) & ((df1["amount"].astype("int") > -200000))]

df_f_da = df1[(df1["amount"].astype("int") <= - 200000) & ((df1["amount"].astype("int") > -1000000))]

df_f_te_da = df1[(df1["amount"].astype("int") <= - 1000000)]

f_xiao = df_f_xiao["amount"].astype("long").sum()

f_zhong = df_f_zhong["amount"].astype("long").sum()

f_da = df_f_da["amount"].astype("long").sum()

f_te_da = df_f_te_da["amount"].astype("long").sum()

df_z_xiao = df1[(df1["amount"].astype("int") > 0) & ((df1["amount"].astype("int") < 40000))]

df_z_zhong = df1[(df1["amount"].astype("int") >= 40000) & ((df1["amount"].astype("int") < 200000))]

df_z_da = df1[(df1["amount"].astype("int") >= 200000) & ((df1["amount"].astype("int") < 1000000))]

df_z_te_da = df1[(df1["amount"].astype("int") >= 1000000)]

z_xiao = df_z_xiao["amount"].astype("long").sum()

z_zhong = df_z_zhong["amount"].astype("long").sum()

z_da = df_z_da["amount"].astype("long").sum()

z_te_da = df_z_te_da["amount"].astype("long").sum()

# add 增加计算最小值

min_L = df1["price"].astype("int").min()

sum_V = abs(df1["vol"].astype("int")).sum()

min_2 = min_L * 1.02

df_min_2 = df1[(df1["price"].astype("int") < min_2)]

sum_min_2_v = abs(df_min_2["vol"].astype("long")).sum()

re_min_L2 = abs(sum_min_2_v) / sum_V * 100

# add time

df_min_3 = pd.DataFrame()

df_min_3["time"] = df_min_2["time"].str[:-2]

df_min_3 = df_min_3.drop_duplicates(subset = [‘time‘],keep = ‘first‘,inplace = False)

time_l2 = len(df_min_3)

#add minute

df_time_all = pd.DataFrame()

df_time_all["time"] = df1["time"].str[:-2]

df_time_all["price"] = df1["price"].astype(‘int‘)

df_time_all["vol"] = df1["vol"].astype(‘long‘)

df_time_all["abs_vol"] = abs(df1["vol"].astype(‘long‘))

df_time_all_only = df_time_all.drop_duplicates(subset=[‘time‘], keep=‘first‘, inplace=False)

df_time_all_only = df_time_all_only.reset_index(drop=True)

df_list_return = pd.DataFrame()

b = {"vol_t": vol_t, "amount_t": amount_t, "z_xiao": z_xiao, "z_zhong": z_zhong, "z_da": z_da, "z_te_da": z_te_da,

"f_xiao": f_xiao, "f_zhong": f_zhong, "f_da": f_da, "f_te_da": f_te_da, "re_min_L2": re_min_L2,

‘time_l2‘: time_l2}

df_list_return = df_list_return.append(b,ignore_index=True)

pd_read = pd.pivot_table(df_time_all, index=‘time‘,values = [‘price‘,‘vol‘,"abs_vol"] , aggfunc = { ‘price‘:np.mean,‘vol‘:np.sum ,"abs_vol":np.sum})

pd_read = pd_read.reset_index()

pd_read["time_abs_vol"] = pd_read["time"] +".2"

pd_read["time_price"] = pd_read["time"] + ".1"

pd_read["time_vol"] = pd_read["time"] + ".3"

pd_r_1 = pd.DataFrame()

pd_r_1["value"] = pd_read["price"]

pd_r_1["time_price"] = pd_read["time_price"]

pd_r_1 = pd_r_1.set_index("time_price",drop = True)

pd_r_2 = pd.DataFrame()

pd_r_2["value"] = pd_read["abs_vol"]

pd_r_2["time_abs_vol"] = pd_read["time_abs_vol"]

pd_r_2 = pd_r_2.set_index("time_abs_vol", drop=True)

pd_r_3 = pd.DataFrame()

pd_r_3["value"] = pd_read["vol"]

pd_r_3["time_vol"] = pd_read["time_vol"]

pd_r_3 = pd_r_3.set_index("time_vol", drop=True)

pd_r_co = pd.DataFrame()

pd_r_co = pd_r_co.append(pd_r_1)

pd_r_co = pd_r_co.append(pd_r_2)

pd_r_co = pd_r_co.append(pd_r_3)

pd_r_co = pd_r_co.reset_index()

pd_r_co["index"] = pd_r_co["index"].astype(‘float‘)

pd_r_co = pd_r_co.sort_values(by = "index")

pd_r_co = pd_r_co.set_index("index", drop=True)

df_list_return = df_list_return.T

df_list_return.columns = [‘value‘]

df_list_return = df_list_return.append(pd_r_co)

df_list_return = df_list_return.T

return df_list_return

def extract_files(filename):#提出7Z文件

with py7zr.SevenZipFile(filename, ‘r‘) as archive:

allfiles = archive.getnames()#获取7Z文件内的子文件名

tempdir = allfiles[0].split("/")[0]#取7Z文件内文件夹名称

savedir =pathsave + str(tempdir)

if os.path.exists(savedir):

shutil.rmtree(savedir)#删除同名文件夹

os.mkdir(savedir)#重建文件夹

#archive.extract(pathsave,allfiles[0:3])#解压到文件夹

archive.extractall(pathsave)#解压到文件夹

#print(archive.extractall())

return savedir

def read_dirs(savedir):#读文件夹

files=np.array(os.listdir(savedir))

file_names = np.char.add(savedir + "",files)

return file_names

def sub_process(df_only_name1,q):

list_t1 = pd.DataFrame()

n_count = 0

for file in df_only_name1:

n_count = n_count + 1

#print("No. " ,n_count)

(filepath, tempfilename) = os.path.split(file)

(filename, extension) = os.path.splitext(tempfilename)

if not os.path.getsize(file): # 判断文件大小是否为0

print("file siz = 0")

print(file)

else:

list_t = pd.DataFrame()

list_t = read_files(file)

list_t.insert(0,"a_name",filename)

list_t1 = list_t1.append(list_t)

list_t1 = list_t1.reset_index(drop = True)#=============================================

q.put(list_t1,block = False)

exit(0)

if __name__ == ‘__main__‘:

#path = r‘G:datas of status ick-by-tick trade‘ # 数据文件存放位置

path = r‘G:datas of status2018-tick-by-tick trade‘

pathsave = ‘G:datas of statuspython codes‘ # 设定临时文件存放位置

pathTemp = ‘G:datas of statuspython codeseveryday_data emp‘

listM = np.array(os.listdir(path)) # 获取月文件夹

print(listM)

listM = np.char.add(path + "", listM) # 获取月文件夹路径

#====================start work

m = 13 # 开始处理第几个文件夹(1~16,16=202004,15=202003)

do_num = 2

for n in range(do_num):

i = m - n #处理第几个文件夹(1~16)

print(listM[i])

listD = np.array(os.listdir(listM[i]))#获取一个文件夹下所有日文件全路径

print(listD)

listD = np.char.add(listM[i] + "",listD)#获取日文件全名

print(listD)

pdM_all = pd.DataFrame()

for filename in listD:

#for filename in listD:

# filename = listD[0]

print("=========")

print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

npM = pd.DataFrame()

savedir = extract_files(filename)

#savedir = "G:datas of statuspython codes20160315"#如果处理单个文件(11111文件夹只存放一个文件包,上一行注释掉,不执行),则用此行,用完还原注释此行

print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

savedir_t = re.sub("-", ‘‘, savedir)

findt = re.search("d+$", savedir_t)

tempdir = findt.group()

#====================

file_names = read_dirs(savedir)

all_nums = len(file_names)

epochs = 3

step = int(all_nums/epochs)

process_list = []

datelist = []

q = multiprocessing.Queue(maxsize=epochs)

for i in range(epochs):

begin = i * step

end = begin + step

if i == epochs -1:

end = all_nums

df_only_name1 = file_names[begin:end]

tmp_process = multiprocessing.Process(target=sub_process, args=(df_only_name1, q))

process_list.append(tmp_process)

for process in process_list:

process.start()

#print("start",process)

while(q.qsize() != epochs):

#print(q.qsize(),"begin")

if(q.qsize()>=1):

#print(q.qsize())

time.sleep(3)

else:

time.sleep(5)

count = 0

time.sleep(1)

#exit(0)

while not q.empty():

list_g = q.get()

#print(list_g,"midle")

#print("hhaa",count )

count = count +1

npM = npM.append(list_g)

#print(npM)

#=======================

shutil.rmtree(savedir)

#npM.columns = list_columns1

#print(len(npM))

pdD_t = npM

pdD_t.insert(1, "date", tempdir, allow_duplicates=False)

#===========

#save_dfile = pathsave + "" + "everyday_data" + "" + pdD_t["date"][0] + ".csv"

save_dfile = pathsave + "" + "everyday_data" + "" + tempdir + ".csv"

# print(save_dfile)

pdD_t = pdD_t.sort_values(by=[‘time_l2‘], ascending=True)

pdD_t.to_csv(save_dfile, sep=",", index=False, header=True)

pdM_all = pdM_all.append(pdD_t)

print(filename)

print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

# print(pdM_all)

save_file = pathsave + pdM_all["date"][0].str[0:6] + ".csv"

save_file = save_file.reset_index(drop=True)

print(save_file[0])

# df.to_csv(‘/opt/births1880.csv’, index=False, header=False

# pdM_all = pdM_all.sort_values(by=[‘re_min_L2‘], ascending=True)

pdM_all.to_csv(save_file[0], sep=",", index=False, header=True)

exit(0)

  

readzip_minute_data 多进程处理数据

以上是 readzip_minute_data多进程处理数据 [操作系统入门] 的全部内容, 来源链接: utcz.com/z/519331.html

回到顶部