首页 > 解决方案 > django并发bulk_create没有将所有值插入数据库

问题描述

我正在使用 djangobulk_create将大量数据存储到 postgresql 数据库。这是在芹菜任务中发生的。基于不同的参数值一次调用此任务的许多实例。有 4 个不同的工作人员并行运行每个任务实例。当只运行一个任务实例时,所有值都被添加到数据库中,但是当多个实例通过不同的工作人员一起运行时,只有一些值被插入

在为 bulk_create 创建模型对象列表时,我添加print了交叉检查值是否被插入到列表中。我发现该值始终添加到列表中,但在 bulk_create 之后,相同的值不会反映在 db 中。我通过在 bulk_createtime.sleep中添加延迟和参数部分解决了这个问题。batch_size但是这种解决方案并不理想,并且由于数据增加了,因此不再解决问题。

我不能发布确切的代码,但它是这样的:* celery 任务:

def some_function():
   param_lst=[1,3,..] #many values
   all_task=[]
   for i in param_lst:
      all_task.append(some_task1.si(i)) #some_task1 and some_task2 are using majorly similar code, only some data processing changes. both use bulk_create to store values
      all_task.append(some_task2.si(i))
   ch = chord( group( all_task )).set(queue="someQueue")(some_funct) #this is how the tasks are invoked

   if ch.ready():
       ch.get()

任务功能定义

@someapp.task
def some_task1(self,i):
  #process/modify data based on param i
  #we end up with json and iterate over it
  batch = []
  for k in someDict:
    result=MyModel(val1=something,...) #all values inside model
    batch.append(result)
  MyModel.objects.bulk_create(batch) #also tried with batch_size parameter

除了数据修改的一些变化外,其他任务类似。它使用相同的方式bulk_create来存储值

没有抛出错误消息。当我只为一个参数运行时,bulk_create 存储所有值,但是当为并行参数列表运行任务时,它会错过向数据库插入一些值。我在网上搜索时找不到这样的东西。我对数据库概念不太了解,对芹菜也很陌生。如果我遗漏了什么或做错了什么,请告诉我

标签: djangopython-3.xconcurrencycelerybulkinsert

解决方案


尝试使用围绕您的 bulk_create 的事务。根据后端,它可能不是事务操作。

from django.db import transaction

@someapp.task
@transaction.atomic
def some_task1(self,i):
  #process/modify data based on param i
  #we end up with json and iterate over it
  batch = []
  for k in someDict:
    result=MyModel(val1=something,...) #all values inside model
    batch.append(result)
  MyModel.objects.bulk_create(batch) #also tried with batch_size parameter

推荐阅读