首页 > 解决方案 > python循环无法识别超出第一个JSON对象

问题描述

我有以下 python 代码访问 s3 存储桶和证书。如您所见,我已经指定了希望将数据/信息发送到的 MQTT_TOPIC。稍后在代码中调用

for uuid_index, uuid in enumerate(uuid_list): 
    for data in json_data:
        CLIENT_ID = "{client_id}".format(**data)
        MQTT_TOPIC = "{organisation}/{device_vendor}/{device_type}".format(**data)
        CERTIFICATION_PATH = "{organisation}/{uuid}".format(**data)
        result = client.list_objects(Bucket=cert_bucket, Prefix=CERTIFICATION_PATH)
        print('-------------------')
        print(result)
        print('-------------------')

我的 JSON 文件如下所示 - 包含 5 个 uuid 和 2 个不同的组织:

[    
    {
        "uuid": "1597c163-6fbf-4f46-8ff6-1e9eb4f07e34",
        "organisation": "port_36",
        "device_vendor": "bitgear",
        "device_type": "IO-Air",
        "client_id": "AQ_2"
    },
    {
        "uuid": "cde2107e-8736-47de-9e87-2033c3063589",
        "organisation": "hchjffsd2immvavb7jiqtedp",
        "device_vendor": "bitgear",
        "device_type": "IO-Air",
        "client_id": "IoT_Sim_1"
    },
    {
        "uuid": "7904f39f-97b0-4a6e-bd9d-fa692c40fe6f",
        "organisation": "hchjffsd2immvavb7jiqtedp",
        "device_vendor": "bitgear",
        "device_type": "IO-Air",
        "client_id": "AQ_1"
    },
    {
        "uuid": "c7880c2c-5386-44d4-ad0e-0002224d5f95",
        "organisation": "hchjffsd2immvavb7jiqtedp",
        "device_vendor": "bitgear",
        "device_type": "IO-Air",
        "client_id": "AQ_3"
    },
    {
        "uuid": "2be124d5-c069-493d-a44f-df4f363ce012",
        "organisation": "port_36",
        "device_vendor": "bitgear",
        "device_type": "IO-Air",
        "client_id": "AQ_4"
    }
]

我已经完整地运行了我的代码,并且正在读取所有 5 个 uuid,但是每个 uuid 的信息都被发送到同一个 MQTT 主题port_36/bitgear/IO-Air(读入的第一个 JSON 对象)。如何让我的循环识别不同的 uuid 及其不同的组织以发布到 MQTT 主题?

编辑 - 回复下面评论的整个代码

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json
import os, tempfile
import boto3
from datetime import datetime
import pandas as pd
import random
import time



s3 = boto3.resource('s3')
client = boto3.client('s3')
data_bucket = '3deo-sensor-data'
key = 'simulated/config/IoT-sim-config.json'
obj = s3.Object(data_bucket, key)
data = obj.get()['Body'].read().decode('utf-8')
json_data = json.loads(data)

uuid_list = [] #list containing the uuids 
for data in json_data:
    uuid_list.append(data["uuid"])


pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
format = '%m/%d/%Y %I:%M:%S %p'
RANGE = 2


def AQ_simulator(): #function to generate random data values for air quality for each uuid
    d = datetime.now().strftime(format)
    ts = time.strptime(d, '%m/%d/%Y  %I:%M:%S %p')
    ts1 = time.mktime(ts)
    timestamp = int(ts1)
    df = pd.DataFrame()
    for uuid in uuid_list: 
        for x in range(RANGE):
            temp = pd.DataFrame({
                    'uuid': uuid,
                    'timestamp': str(timestamp),
                    'NO2 concentration': round(random.uniform(0, 0.017), 3),
                    'SO2 concentration': round(random.uniform(0, 0.022), 3),
                    'temperature': str(random.randint(3, 18)),
                    'relative humidity': str(random.randint(75, 90)),
                    'air pressure': str(random.randint(1011, 1016)),
                    'PM1.0 concentration': str(random.randint(5, 50)),
                    'PM2.5 concentration': str(random.randint(11, 26)),
                    'PM4 concentration': str(random.randint(7, 22)),
                    'PM10 concentration': str(random.randint(1, 20))[:4]}, index=[0],
                    columns = ['uuid', 'timestamp', 'NO2 concentration', 'SO2 concentration', 'temperature', 'relative humidity', 'air pressure', 'PM1.0 concentration', 'PM2.5 concentration', 'PM4 concentration', 'PM10 concentration'])
            df = pd.concat([df, temp])
    result = df.to_json(orient="records")
    parsed = json.loads(result)
    return parsed

MESSAGE = AQ_simulator()


if 'ENDPOINT' in os.environ:
    ENDPOINT = os.environ['ENDPOINT']
else:
    ENDPOINT = 'a3dd26595zqo4r-ats.iot.eu-west-1.amazonaws.com'
if 'BUCKET ' in os.environ:
    BUCKET = os.environ['BUCKET']
else:
    BUCKET = '3deo-sensor-certificates'


def lambda_handler(event, context):

    print("\nvvvvvvvvvvvvvvv")
    print(event)
    print("^^^^^^^^^^^^^^\n")
    

cert_bucket = '3deo-sensor-certificates'


for uuid_index, uuid in enumerate(uuid_list): for each uuid/json object extract the uuid, client_id, organisation, device vendor and device type data
    for data in json_data:
        uuid = "{uuid}".format(**data)
        CLIENT_ID = "{client_id}".format(**data)
        organisation = "{organisation}".format(**data)
        device_vendor = "{device_vendor}".format(**data)
        device_type = "{device_type}".format(**data)
        MQTT_TOPIC = "{organisation}/{device_vendor}/{device_type}".format(**data)
        CERTIFICATION_PATH = "{organisation}/{uuid}".format(**data) #AWS IoT topic that messages should be sent to
        result = client.list_objects(Bucket=cert_bucket, Prefix=CERTIFICATION_PATH)
        print('-------------------')
        print(result)
        print('-------------------')
            
        if 'Contents' in result: contents of s3 bucket based on above info
            list_out = []
            for key in result['Contents']:
                ## Return the files within the main /data directory - IGNORE any other sub directories.
                if not key['Key'].endswith('/'):
                    list_out.append(key['Key'])
            print(list_out)
        with tempfile.TemporaryDirectory() as tmpdir:
            os.chdir(tmpdir)
            for item in list_out:
                if 'certificate.pem.crt' in item:
                    cert_file = item.rsplit('/')[-1]  # split / grab the filename only
                    client.download_file(cert_bucket, item, cert_file)  # copy file to local storage
                    PATH_TO_CERT = os.getcwd() + '/' + cert_file  # configure path to file
                elif 'private.pem.key' in item:
                    key_file = item.rsplit('/')[-1]
                    client.download_file(cert_bucket, item, key_file)
                    PATH_TO_KEY = os.getcwd() + '/' + key_file
                elif 'AmazonRootCA1.pem' in item:
                    root_file = item.rsplit('/')[-1]
                    client.download_file(cert_bucket, item, root_file)
                    PATH_TO_ROOT = os.getcwd() + '/' + root_file
            print(PATH_TO_CERT)
            print(PATH_TO_KEY)
            print(PATH_TO_ROOT)
            # Spin up resources
            event_loop_group = io.EventLoopGroup(1)
            host_resolver = io.DefaultHostResolver(event_loop_group)
            client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
            mqtt_connection = mqtt_connection_builder.mtls_from_path(
                endpoint=ENDPOINT,
                cert_filepath=PATH_TO_CERT,
                pri_key_filepath=PATH_TO_KEY,
                client_bootstrap=client_bootstrap,
                ca_filepath=PATH_TO_ROOT,
                client_id=CLIENT_ID,
                clean_session=False,
                keep_alive_secs=6
            )
            print("Connecting to {} with client ID '{}'...".format(
                ENDPOINT, CLIENT_ID))
            # Make the connect() call
            connect_future = mqtt_connection.connect()
            # Future.result() waits until a result is available
            connect_future.result()
            print("Connected!")
            # Publish message to server desired number of times.
            print('Begin Publish')
    for data in MESSAGE[uuid_index * RANGE:(uuid_index+1) * RANGE]: #for each uuid, the generated messages in the very first loop should be published based on the MQTT_topic derived from each uuids info
        mqtt_connection.publish(topic=MQTT_TOPIC, payload=json.dumps(data), qos=mqtt.QoS.AT_LEAST_ONCE)
        print("Published: '" + json.dumps(data) + "' to the topic: " + MQTT_TOPIC)
        t.sleep(3)
print('Publish End')
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
    

标签: pythonjsonboto3

解决方案


推荐阅读