python - 在 python 代码上需要帮助来读取 json 到 kinesis
问题描述
我收到了要完成的任务。我有一个包含 2000 多条记录的 Json 文件。所以要求是:
- 从 CSV 读取
- 形成一个 JSON 有效载荷(需要覆盖适当数量的字段,考虑订单头字段)
- 推送到 Kinesis(确保每一行都有自己的正确密钥)
- 从 Kinesis 中检索相同的内容。
我的 Json 文件如下所示:
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000971728", "Payment Amount": "00000000000393.45", "BANK ID": "SBC", "Account": "0000007659007", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000829264", "Payment Amount": "00000000000211.20", "BANK ID": "SBC", "Account": "0515096412533", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00001070013", "Payment Amount": "00000000000329.72", "BANK ID": "BCOM", "Account": "017200075595", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
我的生产者代码如下所示:
import boto3
import json
import csv
from datetime import datetime
import calendar
import time
import random
# Reading CSV and saving as json file
csvFilePath="062019.csv"
jsonFilePath="output.json"
data=[]
with open (csvFilePath) as csvFile:
csvReader=csv.DictReader(csvFile)
with open(jsonFilePath,"w") as jsonfile:
for csvRow in csvReader:
jsonfile.write(json.dumps(csvRow)+"\n")
print(data)
# putting data to Kinesis
my_stream_name='ApacItTeamTstOrderStream'
kinesis_client=boto3.client('kinesis',region_name='us-east-1')
with open('output.json', 'r') as file:
for line in file:
put_response=kinesis_client.put_record(
StreamName=my_stream_name,
Data=line,
PartitionKey=str(random.randrange(100)))
print(put_response)
像这样的消费者代码:
import boto3
import json
from datetime import datetime
import time
my_stream_name='ApacItTeamTstOrderStream'
kinesis_client=boto3.client('kinesis',region_name='us-east-1')
response=kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id=response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator=kinesis_client.get_shard_iterator(
StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator=shard_iterator['ShardIterator']
record_response=kinesis_client.get_records(ShardIterator=my_shard_iterator,Limit=2)
print(record_response)
while 'NextShardIterator' in record_response:
record_response=kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],Limit=2)
if record_response['Records']:
print(record_response)
但我得到了一条记录的结果。有人可以帮我吗:
- 需要使用 PUT 记录
- 需要逐行获取生产者数据(现在分区键用作(
PartitionKey=str(random.randrange(100))
) - 当我运行消费者时,我应该得到所有记录的输出。
我得到了@john Rotenstein 的帮助,非常感谢你,请帮助我获得我需要的结果的确切方式。
解决方案
您的代码需要如下所示:
import boto3
import json
import random
my_stream_name='ApacItTeamTstOrderStream'
kinesis_client=boto3.client('kinesis',region_name='us-east-1')
with open('foo.json', 'r') as file:
for line in file:
put_response=kinesis_client.put_record(
StreamName=my_stream_name,
Data=line,
PartitionKey=str(random.randrange(100)))
如果您不希望将整行存储在 Kinesis 中,那么您需要提取与问题中的代码类似的所需记录。
请注意,我在PartitionKey
. 如果不需要以特定方式使用记录,这是常见的做法。但是,如果给定字段的记录需要由同一消费者使用,则将该字段放在PartitionKey
. (如果这没有意义,请参阅:AWS Kinesis 中的分区键是什么?)
推荐阅读
- string - 更高版本手机中的 droidninja 文件选择器它给了我空错误任何想法如何解决它
- python - 无法安装 python-docx (MacOS)
- google-api - 即使在请求中指定了“字段”参数,Google Place by Id 调用也会返回所有字段
- google-apps-script - 仅强制复制和粘贴文本 G-Sheet
- python - python中os.stat和os.fstat的inode数量的区别
- xamarin.forms - 如何在 xamarin.forms 中为选项卡式页面应用渐变色
- apache-kafka - 冗余分布式 NFS 文件系统和复制因子 > 1. Kafka 部署是否安全?
- android - 如何更改底部导航活动菜单布局文件的设计选项卡的视图(工具:showIn)
- haskell - Haskell中的有限域和有理线性代数
- security - 如何通过 GitHub API 或 GraphQL 启用漏洞警报