首页 > 解决方案 > 在 python 中,是否可以在协程任务的新线程中启动方法?

问题描述

这将是我在堆栈上的第一个问题,如果问题不够详细或缺少信息,请多多包涵……

我有一个协程任务处理进入程序的实时滴答数据(包括断开和重新连接的事件),我正在收集这些数据。我希望能够在引发市场数据断开事件时保存数据,但是用于保存数据的代码部分非常大,并且所需的时间会随着存储的数据量而增长。这个保存数据的过程实际上是在阻止重新连接。

我正在使用字典键:queue.Queue 来缓存数据,其中 ric 是数据的 (400~1000) ric 代码,并且对象通过 put_no_wait 放入队列

self.DataGrid[ric]
DataGrid[ric].put_nowait(df)

并通过基本上将队列中的每个项目弹出到数据帧中来保存数据,然后使用 pd.to_csv() 保存它。

    def SaveDataHandler(self):
        if len(self.DataGrid) > 0:
            self.logger.info('Beginning to save data')
            for ric in self.DataGrid.keys():
                if self.DataGrid[ric].qsize() > 0:
                    self.logger.info('Saving data for ric=' + ric)
                    self.write_file(ric)
            self.logger.info('Save data completed')
        else:
            return

    def queue_to_frame(self,ric):
        df = pd.DataFrame()
        while self.DataGrid[ric].qsize() > 0:
            df = df.append(self.DataGrid[ric].get_nowait())
        return df

    def write_file(self,ric):
        df = self.queue_to_frame(ric)
        self.store.collection(ric).item(today_date_str).write_data(df,saveIndex=False)

----------------------------------------------------------------------------------------------

    def write_data(self,df,saveIndex=False):
        if len(df) > 0:
            num = len(self.get_files()) + 1
            df.to_csv(self.path + str(num), sep="\t", quoting=csv.QUOTE_NONE,index=False)

我曾尝试_thread.start_new_thread(SaveDataHandler)在市场数据连接断开时使用,但该过程仍会阻止重新连接检查,这会延迟连接市场数据的时间,这并不理想。

回到我的问题,有没有办法启动一个新线程,这样我就不会遇到阻塞检查重新连接的协程任务?(或者只是我做错了,有一种方法可以将数据附加到数据库或文件系统?我尝试了 pystore,但它们的附加功能有缺陷,无法使用)

标签: pythonmultithreadingcoroutine

解决方案


推荐阅读