python-3.x - ValueError:不能腌制包含指针的 ctypes 对象
问题描述
我是 Python 新手(总共 2 个月的编程/学习经验)。在这段代码中,我所做的只是从 MSSQL 数据库中获取一些数据并将其传输到 DynamDB。我只想知道为什么我会收到此错误: ValueError: ctypes objects contains pointers cannot be pickled 它发生在这一行:p.map(self.import_sample_data_dynamo, list_to_batch)。import_sample_data_dynamo:它是 Dynamo 批处理的功能。list_to_batch:它的字典列表。有人可以告诉我我做错了什么。
class GetSensorsSamplesSetToDynamoDBTable:
def __init__(self):
self.client = None
self.db = None
# MSSQL
self.connection = None
self.cursor: Cursor = None
def init(self):
# MSSQL
connect_string = 'Driver={SQL Server};' \
'Server=xxxx;' 'Database=xxx;' \
'uid=xxx;pwd=xxx'
self.connection = pypyodbc.connect(connect_string)
self.cursor = self.connection.cursor()
# DynamoDB
dynamodb = boto3.resource('dynamodb')
self.table = dynamodb.Table('xxx')
def cleanup(self):
# MSSQL
self.cursor.close()
self.connection.close()
def do_work(self):
self.init()
data = []
samples = self.get_files_received_by_ftp_prod2_data()
for sample in samples:
sample_id = sample['id']
project_id = sample['projectid']
sensor_id = sample['sensorid']
sample_time = sample['sampletime']
row = {"_id": sample_id, 'ProjectID': project_id, 'SensorID': sensor_id,
'Sample_Time': sample_time,
'Z_Fields': sample}
data.append(row)
self.split_for_batch(data)
# self.import_sample_data_dynamo(data)
def get_files_received_by_ftp_prod2_data(self):
sql_cmd = f"SELECT TOP (1000) * FROM FilesReceivedByFTP_Prod2"
self.cursor.execute(sql_cmd)
records = self.cursor.fetchall()
samples = []
records = list(records)
for res in records:
samples_data = {self.cursor.description[i][0]: res[i] for i in range(len(res))}
self.fix_bad_fields(samples_data)
samples.append(samples_data)
return samples
def split_for_batch(self, data):
temp_list = []
list_to_batch = []
while len(data) > 0:
temp_list.append(data[0])
data.pop(0)
if len(temp_list) > 24 or len(data) is 0:
list_to_batch.append(temp_list)
temp_list = []
print(len(data))
print(len(list_to_batch))
start_time = time.time()
num_workers = multiprocessing.cpu_count()
p = Pool(num_workers - 1)
p.map(self.import_sample_data_dynamo, list_to_batch)
p.close()
p.join()
elapsed = time.time() - start_time
print(f"read_all_samples elapsed {elapsed:.0F} Seconds")
def import_sample_data_dynamo(self, data):
with self.table.batch_writer() as batch:
for item in data:
ddb_data = json.loads(json.dumps(item, default=json_util.default),
parse_float=Decimal, object_hook=json_util.object_hook)
batch.put_item(Item=ddb_data)
return True
def fix_bad_fields(self, data):
for k, v in data.items():
if v == '':
data[k] = '---'
# elif type(v) == type(datetime.datetime.now()):
# # data[k] = v.strftime("%d/%m/%Y, %H:%M:%S")
# data[k] = v.timestamp()
elif type(v) is bytearray:
data[k] = "bytearray"
if __name__ == '__main__':
freeze_support()
worker = GetSensorsSamplesSetToDynamoDBTable()
worker.do_work()
解决方案
推荐阅读
- git - 'git gui' / 'git citool' 是否能够触发 git hooks 预提交脚本?
- machine-learning - 我有大约 2000 个用于时间序列预测的多个位置的数据点。我可以在上面应用 LSTM 模型吗?
- ios - 使用 MapKit 和 SwiftUI、Xcode 12 在默认 iOS 地图上移动视图时如何修复“为不可见的矩形请求样式 Z”的终端输出
- c++ - 我尝试使用placement new,但总是遵守错误
- sql - 从 PostgreSQL 中的一组 id 中选择第一个元素
- uwp - UWP 应用未通过 Windows 应用认证工具包测试:Windows 运行时元数据验证
- django - 为什么这“是”比较失败
- sql-server - 一个表到另一个表插入只检查特定字段重复
- python - 如何根据服务主体 ID 限制对 azure 函数的访问
- php - 在 WooCommerce 订单和电子邮件中保存并显示产品自定义元数据