python - 生产者-消费者问题 - 尝试保存到 csv 文件中
问题描述
所以这个看似简单的问题正在引起我的注意。
我有一个数据集datas
(但是,生成一行,然后将其保存到 csv,然后生成一行,然后将其保存等非常费力。
所以我正在尝试实现生产者和消费者线程 - 生产者将产生每一行数据(以加快进程),存储在队列中,然后单个消费者将附加到我的 csv 文件中。
我在下面的尝试有时会成功(数据被正确保存)或其他时候数据被“切断”(整行或部分数据)。
我究竟做错了什么?
from threading import Thread
from queue import Queue
import csv
q = Queue()
def producer():
datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
for data in datas:
q.put(data)
def consumer():
while True:
local = q.get()
file = open('dataset.csv','a')
with file as fd:
writer = csv.writer(fd)
writer.writerow(local)
file.close()
q.task_done()
for i in range(10):
t = Thread(target=consumer)
t.daemon = True
t.start()
producer()
q.join()
解决方案
我认为这与您正在尝试做的事情类似。出于测试目的,它会在生成的 CSV 文件中的每一行数据前加上“生产者 ID”,以便可以在结果中看到数据的来源。
正如您将能够从生成的 csv 文件中看到的那样,生成的所有数据都被放入其中。
import csv
import random
from queue import Queue
from threading import Thread
import time
SENTINEL = object()
def producer(q, id):
data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
("bye", "hat"))
for datum in data:
q.put((id,) + datum) # Prefix producer ID to datum for testing.
time.sleep(random.random()) # Vary thread speed for testing.
class Consumer(Thread):
def __init__(self, q):
super().__init__()
self.q = q
def run(self):
with open('dataset.csv', 'w', newline='') as file:
writer = csv.writer(file, delimiter=',')
while True:
datum = self.q.get()
if datum is SENTINEL:
break
writer.writerow(datum)
def main():
NUM_PRODUCERS = 10
queue = Queue()
# Create producer threads.
threads = []
for id in range(NUM_PRODUCERS):
t = Thread(target=producer, args=(queue, id+1,))
t.start()
threads.append(t)
# Create Consumer thread.
consumer = Consumer(queue)
consumer.start()
# Wait for all producer threads to finish.
while threads:
threads = [thread for thread in threads if thread.is_alive()]
queue.put(SENTINEL) # Indicate to consumer thread no more data.
consumer.join()
print('Done')
if __name__ == '__main__':
main()
推荐阅读
- r - {echarts4r} 中的 geom_label() 等价物
- angular - 什么表示谷歌分析中的空页面?以及如何解决?
- sql - SQL Oracle - 按 ID、任务 ID、最小和最大时间戳分组
- python - 如何在不在终端上打印的情况下使用 os.getcwd() 获取我的密码?
- frama-c - 如何摆脱警告:“已保存文件中的 1 个状态被忽略。在此 Frama-C 配置中无效。”?
- python - 如何更改在reportlab python中绘制表格的x,y起点?
- javascript - 从地图中获取特定数据
- android - 如何使用 Kotlin Coroutines 正确执行 http 请求?
- angular - Angular 页面优化,减少页面加载时间
- python - 将 Bokeh USCounties 示例数据与我自己的数据框中的列值合并