首页 > 解决方案 > IoT 中心设备未接收到所有消息,即缺少一些云到设备 (C2D) 消息

问题描述

我编写了一个示例 .net 5.0 应用程序来了解 IoT 中心,特别是云到设备的消息。该程序使用以下两个 Nuget 包

该程序首先向 IoT 中心注册一个新设备(GUID)。然后它通过将相关 id 设置为 MsgId 1..50 向它发送 50 条消息

发送的每条消息从 UTC Now 算起的过期时间为 1 小时。服务端的 defaultTtlAsIso8601 也在 PT1H 即 1 小时。

所有 50 条消息均已发送,没有任何问题。我已经验证了在 azure 门户 -> IoT Hub -> Explorer -> IoT Devices 上的 Cloud To Device 消息计数

然后程序使用下面的代码打开与设备特定队列的连接。

问题是它在设备/接收器端仅接收 50 条已发送消息中的大约 40 条(代码以及下面的示例输出)。

我还尝试了一种变体,而不是SetReceiveMessageHandlerAsync我使用的设置s_deviceClient.OpenAsync(),然后var receivedMessage = await s_deviceClient.ReceiveAsync(TimeSpan.FromSeconds(1));它产生相同的结果,即接收方丢失了前约 10 条消息。

每次我运行程序(示例输出也在下面)时,它会丢失大约 10 条消息。谁能建议还有什么要看的?

发件人

        private static async Task SendCloudToDeviceMessages(Guid deviceId, int totalMessageCount)
        {
            var serviceClient = ServiceClient.CreateFromConnectionString(iotHubConfiguration.GetConnectionString(PermissionType.serviceconnect));

            var counter = 1;
            var sDeviceId = deviceId.ToString();
            while (counter != -1)
            {
                var commandText = $"Message No. {counter}";
                var messagePayload = $"{{\"commandText\":\"{commandText}\"}}";
                var eventMessage = new Message(Encoding.UTF8.GetBytes(messagePayload))
                {
                    ContentEncoding = Encoding.UTF8.ToString(),
                    ContentType = "application/json",
                    CorrelationId = $"MsgId {counter}",
                    ExpiryTimeUtc = DateTime.Now.AddHours(1)
                };

                try
                {
                    Console.WriteLine($"Sending {commandText}");
                    await serviceClient.SendAsync(sDeviceId, eventMessage);
                    if(counter == totalMessageCount)
                    {
                        counter = -1;
                    }
                    else
                    {
                        counter++;
                    }
                }
                catch (Exception e) // 51st message will cause a exception with error 403004 as c2d queue limit is 50
                {
                    Console.WriteLine(e);
                    counter = -1;
                }
            }
        }


        public string GetConnectionString(PermissionType permission)
        {
            Guard.IsNotNullOrWhiteSpace(IoTHubHostName, nameof(IoTHubHostName));

            var permissionString = Enum.GetName(typeof(PermissionType), permission);
            Guard.IsNotNullOrWhiteSpace(permissionString, nameof(permissionString));

            if (SasPolicies == null || SasPolicies.Count == 0 || !SasPolicies.Any(p => p.Permissions.Equals(permissionString, StringComparison.OrdinalIgnoreCase)))
            {
                throw new InvalidOperationException($"Sas Policies not defined for permission {permission}");
            }

            var policy = SasPolicies.First(p => p.Permissions.Equals(permissionString, StringComparison.OrdinalIgnoreCase));

            return $"HostName={IoTHubHostName};SharedAccessKeyName={policy.KeyName};SharedAccessKey={policy.AccessKey}";
        }


接收者

    public class CloudMessageReceiver
    {
        private DeviceClient s_deviceClient;
        private ConnectionStatus status;

        private AzureDevice device;
        private int receiverNumber;

        public CloudMessageReceiver(Guid deviceId, AzureDevice device, int receiverNumber)
        {
            var connectionstring = $"HostName={device.IoTHubHostName};DeviceId={deviceId};SharedAccessKey={device.PrimarySymmetricKey}";
            s_deviceClient = DeviceClient.CreateFromConnectionString(connectionstring, TransportType.Mqtt);
            s_deviceClient.SetConnectionStatusChangesHandler(ConnectionStatusChangesHandler);

            this.device = device;
            this.receiverNumber = receiverNumber;
        }

        public async Task ReceiveCommands(CancellationToken ct)
        {
            DisplayMessage($"Starting");
            await s_deviceClient.SetReceiveMessageHandlerAsync(ReceiveMessageAndCompleteAsync, device, ct);
            while (!ct.IsCancellationRequested)
            {
                await Task.Delay(TimeSpan.FromSeconds(10));
            }

            DisplayMessage($"exiting loop.");
            await s_deviceClient.CloseAsync();
        }

        private async Task ReceiveMessageAndCompleteAsync(Message receivedMessage, object userContext)
        {
            try
            {
                ProcessMessage(receivedMessage);
                await s_deviceClient.CompleteAsync(receivedMessage);
            }
            catch (Exception e)
            {
                DisplayMessage($"ReceiveMessageAndCompleteAsync {e}");
            }
        }

        private void ProcessMessage(Message receivedMessage)
        {
            string messageData = Encoding.UTF8.GetString(receivedMessage.GetBytes());
            DisplayMessage($"Received message - dCount {receivedMessage.DeliveryCount} - msgId/corId/lockT {receivedMessage.MessageId}/{receivedMessage.CorrelationId}/{receivedMessage.LockToken} - data {messageData}");
        }

        private void ConnectionStatusChangesHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
        {
            DisplayMessage($"connection status change {reason}");
            this.status = status;
        }

        private void DisplayMessage(string message)
        {
            Console.WriteLine($"{DateTime.UtcNow.ToString("O")}; Receiver {receiverNumber}; {status}; {message}");
        }
    }

程序的示例输出- 注意这次我错过了相关 id 为 1..8 的消息

New Device Id - 541905f6-b873-486b-9503-ec91c85d980e
Sending Message No. 1
Sending Message No. 2
Sending Message No. 3
Sending Message No. 4
Sending Message No. 5
Sending Message No. 6
Sending Message No. 7
Sending Message No. 8
Sending Message No. 9
Sending Message No. 10
Sending Message No. 11
Sending Message No. 12
Sending Message No. 13
Sending Message No. 14
Sending Message No. 15
Sending Message No. 16
Sending Message No. 17
Sending Message No. 18
Sending Message No. 19
Sending Message No. 20
Sending Message No. 21
Sending Message No. 22
Sending Message No. 23
Sending Message No. 24
Sending Message No. 25
Sending Message No. 26
Sending Message No. 27
Sending Message No. 28
Sending Message No. 29
Sending Message No. 30
Sending Message No. 31
Sending Message No. 32
Sending Message No. 33
Sending Message No. 34
Sending Message No. 35
Sending Message No. 36
Sending Message No. 37
Sending Message No. 38
Sending Message No. 39
Sending Message No. 40
Sending Message No. 41
Sending Message No. 42
Sending Message No. 43
Sending Message No. 44
Sending Message No. 45
Sending Message No. 46
Sending Message No. 47
Sending Message No. 48
Sending Message No. 49
Sending Message No. 50
Press any key to receive

Press control-C to exit.
DateTime;Receiver;Status;Message
2021-10-18T23:52:31.1711895Z; Receiver 0; Disconnected; Starting
2021-10-18T23:52:31.8820154Z; Receiver 0; Disconnected; connection status change Connection_Ok
2021-10-18T23:52:31.9666838Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 9/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd10 - data {"commandText":"Message No. 9"}
2021-10-18T23:52:31.9740397Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 10/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd11 - data {"commandText":"Message No. 10"}
2021-10-18T23:52:31.9744397Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 11/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd12 - data {"commandText":"Message No. 11"}
2021-10-18T23:52:31.9749889Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 12/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd13 - data {"commandText":"Message No. 12"}
2021-10-18T23:52:31.9755836Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 13/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd14 - data {"commandText":"Message No. 13"}
2021-10-18T23:52:31.9842039Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 14/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd15 - data {"commandText":"Message No. 14"}
2021-10-18T23:52:31.9884916Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 15/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd16 - data {"commandText":"Message No. 15"}
2021-10-18T23:52:31.9888335Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 16/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd17 - data {"commandText":"Message No. 16"}
2021-10-18T23:52:31.9955141Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 17/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd18 - data {"commandText":"Message No. 17"}
2021-10-18T23:52:31.9958093Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 18/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd19 - data {"commandText":"Message No. 18"}
2021-10-18T23:52:31.9961687Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 19/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd20 - data {"commandText":"Message No. 19"}
2021-10-18T23:52:31.9984223Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 20/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd21 - data {"commandText":"Message No. 20"}
2021-10-18T23:52:31.9987580Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 21/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd22 - data {"commandText":"Message No. 21"}
2021-10-18T23:52:32.0046054Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 22/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd23 - data {"commandText":"Message No. 22"}
2021-10-18T23:52:32.0070534Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 23/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd24 - data {"commandText":"Message No. 23"}
2021-10-18T23:52:32.0092722Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 24/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd25 - data {"commandText":"Message No. 24"}
2021-10-18T23:52:32.0155681Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 25/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd26 - data {"commandText":"Message No. 25"}
2021-10-18T23:52:32.0158509Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 26/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd27 - data {"commandText":"Message No. 26"}
2021-10-18T23:52:32.0212764Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 27/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd28 - data {"commandText":"Message No. 27"}
2021-10-18T23:52:32.0215682Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 28/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd29 - data {"commandText":"Message No. 28"}
2021-10-18T23:52:32.0404806Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 29/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd30 - data {"commandText":"Message No. 29"}
2021-10-18T23:52:32.0413058Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 30/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd31 - data {"commandText":"Message No. 30"}
2021-10-18T23:52:32.0602976Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 31/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd32 - data {"commandText":"Message No. 31"}
2021-10-18T23:52:32.0780547Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 32/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd33 - data {"commandText":"Message No. 32"}
2021-10-18T23:52:32.0800057Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 33/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd34 - data {"commandText":"Message No. 33"}
2021-10-18T23:52:32.0928815Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 34/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd35 - data {"commandText":"Message No. 34"}
2021-10-18T23:52:32.0978834Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 35/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd36 - data {"commandText":"Message No. 35"}
2021-10-18T23:52:32.1105230Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 36/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd37 - data {"commandText":"Message No. 36"}
2021-10-18T23:52:32.1126512Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 37/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd38 - data {"commandText":"Message No. 37"}
2021-10-18T23:52:32.2376984Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 38/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd39 - data {"commandText":"Message No. 38"}
2021-10-18T23:52:32.2395194Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 39/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd40 - data {"commandText":"Message No. 39"}
2021-10-18T23:52:32.2655808Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 40/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd41 - data {"commandText":"Message No. 40"}
2021-10-18T23:52:32.2822276Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 41/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd42 - data {"commandText":"Message No. 41"}
2021-10-18T23:52:32.2829551Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 42/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd43 - data {"commandText":"Message No. 42"}
2021-10-18T23:52:32.2964665Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 43/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd44 - data {"commandText":"Message No. 43"}
2021-10-18T23:52:32.2970636Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 44/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd45 - data {"commandText":"Message No. 44"}
2021-10-18T23:52:32.3134796Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 45/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd46 - data {"commandText":"Message No. 45"}
2021-10-18T23:52:32.3141598Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 46/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd47 - data {"commandText":"Message No. 46"}
2021-10-18T23:52:32.3443188Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 47/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd48 - data {"commandText":"Message No. 47"}
2021-10-18T23:52:32.3448780Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 48/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd49 - data {"commandText":"Message No. 48"}
2021-10-18T23:52:32.3591141Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 49/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd50 - data {"commandText":"Message No. 49"}
2021-10-18T23:52:32.3607982Z; Receiver 0; Connected; Received message - dCount 0 - msgId/corId/lockT /MsgId 50/82d1f9c1-fd4d-4c5f-8277-aa2e85c752dd51 - data {"commandText":"Message No. 50"}

程序.cs

    class Program
    {
        private static IoTHubConfiguration iotHubConfiguration;

        static async Task Main(string[] args)
        {
            // first we will create a new device and register it with iot hub
            // then we will mimic c2d msg flow and send a message(s). 
            // then start the receiver to get all the messages from device q mimicing essentially receiver down scenario and then coming up

            iotHubConfiguration = GetHubConfiguration();

            var deviceId = Guid.NewGuid();
            Console.WriteLine($"New Device Id - {deviceId}");

            var device = await RegisterDeviceWithIotHub(deviceId);

            await SendCloudToDeviceMessages(deviceId, 50);

            Console.WriteLine($"Press any key to receive");
            Console.ReadLine();

            await StartReceiver(deviceId, device);

            Console.WriteLine($"Finished.");
        }

        private static IoTHubConfiguration GetHubConfiguration()
        {
            var hc = new IoTHubConfiguration
            {
                IoTHubHostName = "***.azure-devices.net", // redacted
                SasPolicies = new List<SasPolicy>()
            };

            hc.SasPolicies.Add(new SasPolicy
            {
                KeyName = "iothubowner",
                AccessKey = "1hCa***=", // redacted
                Permissions = "owner"
            });

            hc.SasPolicies.Add(new SasPolicy
            {
                KeyName = "registryReadWrite",
                AccessKey = "LCf***95I=", // redacted
                Permissions = "registry"
            });

            hc.SasPolicies.Add(new SasPolicy
            {
                KeyName = "service",
                AccessKey = "yMA***Bk=", // redacted
                Permissions = "serviceconnect"
            });

            hc.SasPolicies.Add(new SasPolicy
            {
                KeyName = "device",
                AccessKey = "yM***Hc=", // redacted
                Permissions = "deviceconnect"
            });

            return hc;
        }

        private static async Task<AzureDevice> RegisterDeviceWithIotHub(Guid deviceId)
        {
            var registryManager = new IoTDeviceRegistryManager(iotHubConfiguration);
            return await registryManager.RegisterDevice(deviceId.ToString());
        }

        private static async Task SendCloudToDeviceMessages(Guid deviceId, int totalMessageCount)
        {
            var serviceClient = ServiceClient.CreateFromConnectionString(iotHubConfiguration.GetConnectionString(PermissionType.serviceconnect));

            var counter = 1;
            var sDeviceId = deviceId.ToString();
            while (counter != -1)
            {
                var commandText = $"Message No. {counter}";
                var messagePayload = $"{{\"commandText\":\"{commandText}\"}}";
                var eventMessage = new Message(Encoding.UTF8.GetBytes(messagePayload))
                {
                    ContentEncoding = Encoding.UTF8.ToString(),
                    ContentType = "application/json",
                    CorrelationId = $"MsgId {counter}",
                    ExpiryTimeUtc = DateTime.Now.AddHours(1)
                };

                try
                {
                    Console.WriteLine($"Sending {commandText}");
                    await serviceClient.SendAsync(sDeviceId, eventMessage);
                    if(counter == totalMessageCount)
                    {
                        counter = -1;
                    }
                    else
                    {
                        counter++;
                    }
                }
                catch (Exception e) // 51st message will cause a exception with error 403004 as c2d queue limit is 50
                {
                    Console.WriteLine(e);
                    counter = -1;
                }
            }
        }
        
        private static async Task StartReceiver(Guid deviceId, AzureDevice device)
        {
            Console.WriteLine("Press control-C to exit.");
            using var cts = new CancellationTokenSource();
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                cts.Cancel();
                Console.WriteLine("Exiting...");
            };

            //try
            //{
            //    Parallel.For(0, 5, async i =>
            //    {
            //        await new CloudMessageReceiver(deviceId, device, i).ReceiveCommands(cts.Token);
            //    });
            //}
            //catch (Exception e)
            //{
            //    Console.WriteLine(e);
            //}

            Console.WriteLine("DateTime;Receiver;Status;Message");
            var receivers = new List<Task>();
            for (int i = 0; i < 1; i++)
            {
                receivers.Add(new CloudMessageReceiver(deviceId, device, i).ReceiveCommands(cts.Token));
            }

            await Task.WhenAll(receivers);
        }
    }

标签: azureazure-iot-hubazure-iot-sdk

解决方案


终于解开了谜团。以下序列有效:

  • 从服务客户端向 IoT 中心注册设备

  • 注册后立即模拟设备,连接到 IoT 中心并关闭它。

          var client = DeviceClient.CreateFromConnectionString(iotHubConfiguration.GetConnectionString(PermissionType.deviceconnect), deviceId, Microsoft.Azure.Devices.Client.TransportType.Mqtt);
          await client.OpenAsync();
          await client.ReceiveAsync(TimeSpan.FromMilliseconds(1));
          await client.CloseAsync();
    
  • 现在发送消息并在设备端接收所有消息。

看起来,IoT Hub 确实在设备连接到它之前抢占了发送给它的消息。


推荐阅读