python - AWS IoT Core:能够从设备连接到 AWS,但已发布的消息未显示在控制台中并且订阅主题无限期挂起
问题描述
物联网核心非常新。所以我基本上只是重新利用 AWS SDK 示例 pubsub.py 代码(https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py)但是出于某种原因,即使我可以建立与 AWS 的连接,我也无法:
- 订阅主题
执行时iot_connection.subscribe()
,终端会显示“Subscribing to topic test/topic...”并无限期挂起。请注意,如果我不包含subscribe_result = subscribe_future.result()
下面的完整代码,则会成功执行,但同样,控制台中不会收到任何消息。附带问题:您实际上必须订阅一个主题才能发布到它吗?
- 在控制台中查看消息
执行时iot_connection.publish()
,一切似乎都已成功运行,但是,AWS 测试控制台中未显示“Hello, World”,即使我订阅了“#”和“test/topic”。
任何帮助将非常感激!
代码:
import argparse
from uuid import uuid4
import json
import time
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
def make_parser():
parser = argparse.ArgumentParser(description="Send and receive messages through and MQTT connection.")
parser.add_argument('endpoint', help="Your AWS IoT custom endpoint, not including a port.")
parser.add_argument('--port', type=int, help="Specify port. AWS IoT supports 443 and 8883.", metavar='')
parser.add_argument('--cert', help="File path to your client certificate, in PEM format.", metavar='')
parser.add_argument('--key', help="File path to your private key, in PEM format.", metavar='')
parser.add_argument('--root-ca', help="File path to root certificate authority, in PEM format.", metavar='')
parser.add_argument('--client-id', default="test-" + str(uuid4()), help="Client ID for MQTT connection.",
metavar='')
parser.add_argument('--topic', default="test/topic", help="Topic to subscribe to, and publish messages to.",
metavar='')
parser.add_argument('--message', default="Hello World!", help="Message to publish. ", metavar='')
parser.add_argument('--count', default=10, type=int, help="Number of messages to publish.", metavar='')
return parser
class IoT:
def __init__(self, args):
self.endpoint = args.endpoint
self.port = args.port
self.cert = args.cert
self.key = args.key
self.root_ca = args.root_ca
self.client_id = args.client_id
self.topic = args.topic
self.message = args.message
self.count = args.count
print("Initializing parameters...")
def __enter__(self):
print("Spinning up resources...")
self.event_loop_group = io.EventLoopGroup(1)
self.host_resolver = io.DefaultHostResolver(self.event_loop_group)
self.client_bootstrap = io.ClientBootstrap(self.event_loop_group, self.host_resolver)
print("Establishing connection to AWS...")
self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=self.endpoint,
port=self.port,
cert_filepath=self.cert,
pri_key_filepath=self.key,
ca_filepath=self.root_ca,
client_id=self.client_id,
client_bootstrap=self.client_bootstrap,
clean_session=False,
keep_alive_secs=30
)
connect_future = self.mqtt_connection.connect()
connect_future.result()
print("Connected!")
def __exit__(self, exception_type, exception_value, traceback):
print("Disconnecting...")
disconnect_future = self.mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
def subscribe(self):
print("Subscribing to topic {}...".format(self.topic))
subscribe_future, packet_id = self.mqtt_connection.subscribe(
topic=self.topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_message_recieved
)
subscribe_result = subscribe_future.result()
print("Result: {}".format(str(subscribe_result['qos'])))
print("Subscribed!")
def on_message_recieved(self, topic, paylod, dup, qos, retain, **kwargs):
print("Recieved message")
def publish(self):
if self.message:
print("Publishing message to topic '{}': {}".format(self.topic, self.message))
message = "{} [{}]".format(self.message, self.count)
message_json = json.dumps(message)
self.mqtt_connection.publish(
topic=self.topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE
)
time.sleep(1) # thought this might fix things, it did not
if __name__ == '__main__':
print("Gathering user inputs...")
parser = make_parser()
args = parser.parse_args()
# Starting IoT Core connection
iot_connection = IoT(args)
with iot_connection:
iot_connection.subscribe() # subscribing to topic
iot_connection.publish() # publishing message hopefully
解决方案
期间解决了。这一定是我的资源 ARN 的问题 - 当我创建新策略并将资源 ARN 设置为“*”时,一切都按预期工作。以前我曾尝试将 ARN 限制为特定的客户端 ID。不知道为什么这不起作用,因为据我所知,我正确处理了客户端 ID。
推荐阅读
- r - 将 CSV 作为字符串直接读入 R
- javascript - React.createRef 不是 react-rails 中的函数
- android - 如何以编程方式连接到 Android 中的 WPA2-PSK Wi-Fi 网络?
- android - 如何避免在两个不同的片段中同时调用两个相同的事件
- vba - 清除选定范围内一行中的重复值
- php - Laravel 5.4:在 Laravel Eloquent 中转换原始 SQL 查询
- c - 如何通过指向指针的指针访问结构的字段?
- java - 如何在 Spring Boot Webflux 中克隆 ServerResponse
- python - Tensorflow 初始化变量
- c# - 为什么 .ToList() 和 AsEnumerable() 使我的查询区分大小写但 AsQueryable() 不是?