python - python循环无法识别超出第一个JSON对象
问题描述
我有以下 python 代码访问 s3 存储桶和证书。如您所见,我已经指定了希望将数据/信息发送到的 MQTT_TOPIC。稍后在代码中调用
for uuid_index, uuid in enumerate(uuid_list):
for data in json_data:
CLIENT_ID = "{client_id}".format(**data)
MQTT_TOPIC = "{organisation}/{device_vendor}/{device_type}".format(**data)
CERTIFICATION_PATH = "{organisation}/{uuid}".format(**data)
result = client.list_objects(Bucket=cert_bucket, Prefix=CERTIFICATION_PATH)
print('-------------------')
print(result)
print('-------------------')
我的 JSON 文件如下所示 - 包含 5 个 uuid 和 2 个不同的组织:
[
{
"uuid": "1597c163-6fbf-4f46-8ff6-1e9eb4f07e34",
"organisation": "port_36",
"device_vendor": "bitgear",
"device_type": "IO-Air",
"client_id": "AQ_2"
},
{
"uuid": "cde2107e-8736-47de-9e87-2033c3063589",
"organisation": "hchjffsd2immvavb7jiqtedp",
"device_vendor": "bitgear",
"device_type": "IO-Air",
"client_id": "IoT_Sim_1"
},
{
"uuid": "7904f39f-97b0-4a6e-bd9d-fa692c40fe6f",
"organisation": "hchjffsd2immvavb7jiqtedp",
"device_vendor": "bitgear",
"device_type": "IO-Air",
"client_id": "AQ_1"
},
{
"uuid": "c7880c2c-5386-44d4-ad0e-0002224d5f95",
"organisation": "hchjffsd2immvavb7jiqtedp",
"device_vendor": "bitgear",
"device_type": "IO-Air",
"client_id": "AQ_3"
},
{
"uuid": "2be124d5-c069-493d-a44f-df4f363ce012",
"organisation": "port_36",
"device_vendor": "bitgear",
"device_type": "IO-Air",
"client_id": "AQ_4"
}
]
我已经完整地运行了我的代码,并且正在读取所有 5 个 uuid,但是每个 uuid 的信息都被发送到同一个 MQTT 主题port_36/bitgear/IO-Air
(读入的第一个 JSON 对象)。如何让我的循环识别不同的 uuid 及其不同的组织以发布到 MQTT 主题?
编辑 - 回复下面评论的整个代码
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json
import os, tempfile
import boto3
from datetime import datetime
import pandas as pd
import random
import time
s3 = boto3.resource('s3')
client = boto3.client('s3')
data_bucket = '3deo-sensor-data'
key = 'simulated/config/IoT-sim-config.json'
obj = s3.Object(data_bucket, key)
data = obj.get()['Body'].read().decode('utf-8')
json_data = json.loads(data)
uuid_list = [] #list containing the uuids
for data in json_data:
uuid_list.append(data["uuid"])
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
format = '%m/%d/%Y %I:%M:%S %p'
RANGE = 2
def AQ_simulator(): #function to generate random data values for air quality for each uuid
d = datetime.now().strftime(format)
ts = time.strptime(d, '%m/%d/%Y %I:%M:%S %p')
ts1 = time.mktime(ts)
timestamp = int(ts1)
df = pd.DataFrame()
for uuid in uuid_list:
for x in range(RANGE):
temp = pd.DataFrame({
'uuid': uuid,
'timestamp': str(timestamp),
'NO2 concentration': round(random.uniform(0, 0.017), 3),
'SO2 concentration': round(random.uniform(0, 0.022), 3),
'temperature': str(random.randint(3, 18)),
'relative humidity': str(random.randint(75, 90)),
'air pressure': str(random.randint(1011, 1016)),
'PM1.0 concentration': str(random.randint(5, 50)),
'PM2.5 concentration': str(random.randint(11, 26)),
'PM4 concentration': str(random.randint(7, 22)),
'PM10 concentration': str(random.randint(1, 20))[:4]}, index=[0],
columns = ['uuid', 'timestamp', 'NO2 concentration', 'SO2 concentration', 'temperature', 'relative humidity', 'air pressure', 'PM1.0 concentration', 'PM2.5 concentration', 'PM4 concentration', 'PM10 concentration'])
df = pd.concat([df, temp])
result = df.to_json(orient="records")
parsed = json.loads(result)
return parsed
MESSAGE = AQ_simulator()
if 'ENDPOINT' in os.environ:
ENDPOINT = os.environ['ENDPOINT']
else:
ENDPOINT = 'a3dd26595zqo4r-ats.iot.eu-west-1.amazonaws.com'
if 'BUCKET ' in os.environ:
BUCKET = os.environ['BUCKET']
else:
BUCKET = '3deo-sensor-certificates'
def lambda_handler(event, context):
print("\nvvvvvvvvvvvvvvv")
print(event)
print("^^^^^^^^^^^^^^\n")
cert_bucket = '3deo-sensor-certificates'
for uuid_index, uuid in enumerate(uuid_list): for each uuid/json object extract the uuid, client_id, organisation, device vendor and device type data
for data in json_data:
uuid = "{uuid}".format(**data)
CLIENT_ID = "{client_id}".format(**data)
organisation = "{organisation}".format(**data)
device_vendor = "{device_vendor}".format(**data)
device_type = "{device_type}".format(**data)
MQTT_TOPIC = "{organisation}/{device_vendor}/{device_type}".format(**data)
CERTIFICATION_PATH = "{organisation}/{uuid}".format(**data) #AWS IoT topic that messages should be sent to
result = client.list_objects(Bucket=cert_bucket, Prefix=CERTIFICATION_PATH)
print('-------------------')
print(result)
print('-------------------')
if 'Contents' in result: contents of s3 bucket based on above info
list_out = []
for key in result['Contents']:
## Return the files within the main /data directory - IGNORE any other sub directories.
if not key['Key'].endswith('/'):
list_out.append(key['Key'])
print(list_out)
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
for item in list_out:
if 'certificate.pem.crt' in item:
cert_file = item.rsplit('/')[-1] # split / grab the filename only
client.download_file(cert_bucket, item, cert_file) # copy file to local storage
PATH_TO_CERT = os.getcwd() + '/' + cert_file # configure path to file
elif 'private.pem.key' in item:
key_file = item.rsplit('/')[-1]
client.download_file(cert_bucket, item, key_file)
PATH_TO_KEY = os.getcwd() + '/' + key_file
elif 'AmazonRootCA1.pem' in item:
root_file = item.rsplit('/')[-1]
client.download_file(cert_bucket, item, root_file)
PATH_TO_ROOT = os.getcwd() + '/' + root_file
print(PATH_TO_CERT)
print(PATH_TO_KEY)
print(PATH_TO_ROOT)
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=PATH_TO_CERT,
pri_key_filepath=PATH_TO_KEY,
client_bootstrap=client_bootstrap,
ca_filepath=PATH_TO_ROOT,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=6
)
print("Connecting to {} with client ID '{}'...".format(
ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
# Publish message to server desired number of times.
print('Begin Publish')
for data in MESSAGE[uuid_index * RANGE:(uuid_index+1) * RANGE]: #for each uuid, the generated messages in the very first loop should be published based on the MQTT_topic derived from each uuids info
mqtt_connection.publish(topic=MQTT_TOPIC, payload=json.dumps(data), qos=mqtt.QoS.AT_LEAST_ONCE)
print("Published: '" + json.dumps(data) + "' to the topic: " + MQTT_TOPIC)
t.sleep(3)
print('Publish End')
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
解决方案
推荐阅读
- php - 即使在其他页面上尝试登录,也会在登录页面上显示错误
- python-3.x - 如何从 PostgreSQL 服务器端游标获取 psycopg2 的描述
- javascript - 无法在步骤定义中使用 pageobject 方法读取未定义的属性“enterText”
- html - 为什么我的背景延伸到页面顶部的边缘,而不是底部?
- amazon-web-services - 如何创建 AWS 安全组,仅限制来自 Google Cloud 上 Kubernetes 的入站流量?
- android - 将 Android Studio 用于不兼容的设备
- javascript - 当仅使用单字符、双字符或 rand 字符时,防止 google geocoder API 返回状态“OK”
- javascript - 如何在活动页面上设置图标?
- google-chrome-extension - 仅在扩展中临时存储当前选项卡
- javascript - 在 Get 请求期间从 Node.js 中的 JSON 数组中读取值