首页 > 解决方案 > 生产者-消费者问题 - 尝试保存到 csv 文件中

问题描述

所以这个看似简单的问题正在引起我的注意。

我有一个数据集datas(但是,生成一行,然后将其保存到 csv,然后生成一行,然后将其保存等非常费力。

所以我正在尝试实现生产者和消费者线程 - 生产者将产生每一行数据(以加快进程),存储在队列中,然后单个消费者将附加到我的 csv 文件中。

我在下面的尝试有时会成功(数据被正确保存)或其他时候数据被“切断”(整行或部分数据)。

我究竟做错了什么?


from threading import Thread
from queue import Queue
import csv

q = Queue()

def producer():
    datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
    for data in datas:
        q.put(data)


def consumer():
    while True:
        local = q.get()
        file = open('dataset.csv','a')
        with file as fd:
            writer = csv.writer(fd)
            writer.writerow(local)
        file.close()

        q.task_done()


for i in range(10):
    t = Thread(target=consumer)
    t.daemon = True
    t.start()

producer()

q.join()

标签: pythonconcurrencyproducer-consumer

解决方案


我认为这与您正在尝试做的事情类似。出于测试目的,它会在生成的 CSV 文件中的每一行数据前加上“生产者 ID”,以便可以在结果中看到数据的来源。

正如您将能够从生成的 csv 文件中看到的那样,生成的所有数据都被放入其中。

import csv
import random
from queue import Queue
from threading import Thread
import time

SENTINEL = object()

def producer(q, id):
    data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
            ("bye", "hat"))
    for datum in data:
        q.put((id,) + datum)  # Prefix producer ID to datum for testing.
        time.sleep(random.random())  # Vary thread speed for testing.


class Consumer(Thread):
    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        with open('dataset.csv', 'w', newline='') as file:
            writer = csv.writer(file, delimiter=',')
            while True:
                datum = self.q.get()
                if datum is SENTINEL:
                    break
                writer.writerow(datum)

def main():
    NUM_PRODUCERS = 10
    queue = Queue()

    # Create producer threads.
    threads = []
    for id in range(NUM_PRODUCERS):
        t = Thread(target=producer, args=(queue, id+1,))
        t.start()
        threads.append(t)

    # Create Consumer thread.
    consumer = Consumer(queue)
    consumer.start()

    # Wait for all producer threads to finish.
    while threads:
        threads = [thread for thread in threads if thread.is_alive()]

    queue.put(SENTINEL)  # Indicate to consumer thread no more data.
    consumer.join()
    print('Done')

if __name__ == '__main__':
    main()


推荐阅读