首页 > 解决方案 > 在 python 代码上需要帮助来读取 json 到 kinesis

问题描述

我收到了要完成的任务。我有一个包含 2000 多条记录的 Json 文件。所以要求是:

我的 Json 文件如下所示:

{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000971728", "Payment Amount": "00000000000393.45", "BANK ID": "SBC", "Account": "0000007659007", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000829264", "Payment Amount": "00000000000211.20", "BANK ID": "SBC", "Account": "0515096412533", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00001070013", "Payment Amount": "00000000000329.72", "BANK ID": "BCOM", "Account": "017200075595", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}

我的生产者代码如下所示:

import boto3
import json
import csv
from datetime import datetime
import calendar
import time
import random


# Reading CSV and saving as json file

csvFilePath="062019.csv"
jsonFilePath="output.json"

data=[]

with open (csvFilePath) as csvFile:
    csvReader=csv.DictReader(csvFile)
    with open(jsonFilePath,"w") as jsonfile:

        for csvRow in csvReader:
            jsonfile.write(json.dumps(csvRow)+"\n")


print(data)

# putting data to Kinesis

my_stream_name='ApacItTeamTstOrderStream'

kinesis_client=boto3.client('kinesis',region_name='us-east-1')


with open('output.json', 'r') as file:
    for line in file:
        put_response=kinesis_client.put_record(
            StreamName=my_stream_name,
            Data=line,
            PartitionKey=str(random.randrange(100)))
    
        print(put_response)

像这样的消费者代码:

import boto3
import json
from datetime import datetime
import time

my_stream_name='ApacItTeamTstOrderStream'

kinesis_client=boto3.client('kinesis',region_name='us-east-1')

response=kinesis_client.describe_stream(StreamName=my_stream_name)

my_shard_id=response['StreamDescription']['Shards'][0]['ShardId']

shard_iterator=kinesis_client.get_shard_iterator(

                            StreamName=my_stream_name,
                            ShardId=my_shard_id,
                            ShardIteratorType='LATEST')

my_shard_iterator=shard_iterator['ShardIterator']

record_response=kinesis_client.get_records(ShardIterator=my_shard_iterator,Limit=2)

print(record_response)

while 'NextShardIterator' in record_response:
        record_response=kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],Limit=2)

    if record_response['Records']:
        print(record_response)

但我得到了一条记录的结果。有人可以帮我吗:

  1. 需要使用 PUT 记录
  2. 需要逐行获取生产者数据(现在分区键用作(PartitionKey=str(random.randrange(100))
  3. 当我运行消费者时,我应该得到所有记录的输出。

我得到了@john Rotenstein 的帮助,非常感谢你,请帮助我获得我需要的结果的确切方式。

标签: pythonjsonamazon-web-servicesamazon-kinesis

解决方案


您的代码需要如下所示:

import boto3
import json
import random

my_stream_name='ApacItTeamTstOrderStream'

kinesis_client=boto3.client('kinesis',region_name='us-east-1')

with open('foo.json', 'r') as file:
    for line in file:
        put_response=kinesis_client.put_record(
            StreamName=my_stream_name,
            Data=line,
            PartitionKey=str(random.randrange(100)))

如果您不希望将整行存储在 Kinesis 中,那么您需要提取与问题中的代码类似的所需记录。

请注意,我在PartitionKey. 如果不需要以特定方式使用记录,这是常见的做法。但是,如果给定字段的记录需要由同一消费者使用,则将该字段放在PartitionKey. (如果这没有意义,请参阅:AWS Kinesis 中的分区键是什么?


推荐阅读