首页 > 解决方案 > 使用python线程访问大量数据

问题描述

我有很多熊猫数据框。我从他们那里获取信息,然后返回一些信息。因为我使用的函数需要很多时间,每个循环大约 5 秒,而且我正在制作很多循环,所以我想并行化它。

首先,我尝试了多处理模块的过程。问题是我有很多只读数据存储在内存中,不同的进程不是一个好主意,因为使用不同的内存。

所以我决定尝试线程。我想多次调用一个函数,并将输入分成几部分。因此,我不是一次访问大约 10k 个数据帧,而是尝试每次函数调用访问大约 1k 个数据帧,但运行 12 个线程。

我的想法是:

def pare():        
relist = list(reduced_stocks.keys())
sublist = [relist[x:x+332] for x in range(0, len(relist), 332)]  
data = [(1.4, 2500, 8, x)  for x in sublist]
data = [x  for x in sublist]
threads = list()
from threading import Thread
for i in range(12):
    process = Thread(target=find_something2, args=(1.4,2500,8,data[i],i,results))
    process.start()
    threads.append(process)
for process in threads:
    process.join()

但是当我意识到并行和串行执行的时间几乎相同时。使用线程池会有更好的结果吗?正如我所说的问题是我试图并行运行的函数必须返回一个列表。所以在上面的例子中,我将线程的每个结果存储在一个全局列表变量中。

我还添加了正在运行的 find_something 函数和worth_buy 函数。我将它用于 current_day>'2005-01-01' 所以不要看其他参考资料。

def find_something2(threl=2.0, my_limit=150, far=365, mystocks=None,index=None,results=None):
""" Find stocks that are worth buying"""
global current_date, total_money, min_date, current_name, dates_dict, mylist, min_date_sell, reduced_stocks
worthing = list()
if current_date < '2005-01-01':
    for stock in mylist:
        if dates_dict[stock][0] <= min_date:  # don't open all files
            frame = open_txt(stock)
            temp = frame.loc[current_date:end_date]
            if not temp.empty:
                mydate = temp.head(far).Low.idxmin()
                my_min = temp.head(far).Low.min()
                if total_money >= my_min > 0:  # find the min date at four months
                    ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
                                                                   thres=threl,
                                                                   sell_limit=my_limit)
                    if ans:
                        if current_date > '2015-01-01':
                            if total >= 1000:
                                worthing.append([mydate, stock, res, when_sell, total, income])
                        elif current_date > '2000-01-01':
                            if income > 3 * 10 ** 6:
                                worthing.append([mydate, stock, res, when_sell, total, income])
                        elif current_date > '1985-01-01':
                            if income > 1.5 * 10 ** 6:
                                worthing.append([mydate, stock, res, when_sell, total, income])
                        else:
                            worthing.append([mydate, stock, res, when_sell, total, income])
else:
    for stock in mystocks:
        frame = reduced_stocks[stock]
        temp = frame.loc[current_date:end_date]
        if not temp.empty:
            mydate = temp.head(far).Low.idxmin()
            my_min = temp.head(far).Low.min()
            if total_money >= my_min > 0:  # find the min date at four months
                ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
                                                               thres=threl, sell_limit=my_limit)
                if ans:
                    if income > 3 * 10 ** 6:
                        worthing.append([mydate, stock, res, when_sell, total, income])
if current_date > '1990-01-01':
    results[index] = sorted(worthing, key=itemgetter(0))
elif current_date > '1985-01-01':
    return sorted(worthing, key=itemgetter(0))
else:
    answer = sorted(worthing, key=itemgetter(5), reverse=True)
    return answer[::11]

def worth_buy(stock, frame, date, code, thres=2.0, sell_limit=0):
""" Checks if the stock is worth buying"""
global keep_time, end_date, total_money
over = find_limit(date, sell_limit)
checking = frame.loc[date:over]
if checking.empty:
    return False, 0, '', 0, 0  # anything at date limits
if stock not in purchased:
    when_sell = ((checking.High / frame.at[date, code]).idxmax())
else:
    possible_dates = purchased[stock][3].split(sep='/')
    other_sell = find_limit(possible_dates[-1], 1)
    checking2 = checking.loc[other_sell:]
    if checking2.empty:
        return False, 0, '', 0, 0  # anything at date limits
    when_sell = (checking2.High / frame.at[date, code]).idxmax()
buy_value = frame.at[date, code]
sell_value = frame.at[when_sell, code]
total = buy_total(frame, date, code, when_sell)
ans = sell_value / buy_value  # mporw na exw sunartisi
income = (sell_value - buy_value) * total
if when_sell <= end_date:
    return ans >= thres, ans, when_sell, total, income
else:
    return False, 0, '', 0, 0

有没有机会因为大部分时间都花在读取数据而不是处理数据上而不会变得更好?我认为,但不确定由于我只是在阅读而不是在写作,因此使用多线程必须有更好的结果。

另一个重要的事情是 find_something 函数和其他函数不会更新任何全局值,此时所有内容都是只读的。我已将通过存储在字典中的名称访问的所有数据帧存储在内存中。

提前致谢

编辑这是我在字典中存储数据帧的方式:

def initialize():
global current_date, file_names, current_name, dates_dict, mylist, sell_when, min_date_sell
worthing = 1
for stock in file_names:
    if getsize('C:/Users/tzagk/Downloads/Stocks/' + stock) > 0:
        frame = pd.read_csv('C:/Users/tzagk/Downloads/Stocks/' + stock, dtype=size_dict, usecols=use_cols, header=0,
                            index_col=0)
        dates_dict[stock] = [frame.index[0], frame]  # store start_time, data frame
        mylist.append(stock)
        if frame.index[0] <= min_date:  #
            check_date = frame.head(365).Low.idxmin()
            if frame.head(365).Low.min() <= 1:  # find the min date at four months
                ans, res, when_sell, total, income = worth_buy(stock, frame, check_date,
                                                               code='Low', thres=1.2,
                                                               sell_limit=3000)
                if ans:
                    if res > worthing:  # check best worth
                        worthing = res
                        current_date = check_date  # set start date
                        current_name = stock
                        min_date_sell = when_sell

标签: pythonpandasmultithreadingmultiprocessing

解决方案


推荐阅读