首页 > 解决方案 > 当数据到达 Azure 队列时向连接的 IOT 设备发送消息?

问题描述

我有一个Azure IOT 集线器,一些Arduino 设备不时连接到该集线器。我还有一个Azure 队列,它可以包含我想在连接时发送到各个设备的 json 字符串。

我按照示例创建了一个在数据到达队列时被调用的 webjob。我还按照示例安排了在连接时将数据发送到 IOT 设备的作业。单独运行时,两者都可以正常工作。

我结合了这些示例,因此当 ProcessQueueMessage() 因队列中到达而被调用时,我会安排一个作业以将该数据发送到 IOT 设备。

public static async Task ProcessQueueMessage([QueueTrigger("inbox")] string message, ILogger logger)
{
  logger.LogInformation(message);
  Message = message;

  string methodJobId = Guid.NewGuid().ToString();
  await StartMethodJob(methodJobId, Message);
  MonitorJob(methodJobId).Wait();
}

但是,这样做会导致 SystemNullException,我不知道为什么。所有函数都是静态的,如果我单步执行,我可以看到传入的参数有值。结果,它将队列项目移动到另一个队列“收件箱毒药”。

info: Function.ProcessQueueMessage.User[0]
      {"TransId":3517,"DeviceId":"Huzz_2_2_2","PriceAlert":{"Price":"High","Variance":0.0,"DisplaySetting":1,"Duration":60}}
fail: Function.ProcessQueueMessage[0]
      Executed 'Functions.ProcessQueueMessage' (Failed, Id=556812ab-4a58-4ae1-9f4f-6336962d92d2)
Microsoft.Azure.WebJobs.Host.FunctionInvocationException: Exception while executing function: Functions.ProcessQueueMessage ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at IncomeEligible.Alerts.Functions.StartMethodJob(String jobId, String json) in C:\source\repos\Income_Eligible_Price_Display\src\IncomeEligible.Alerts\IncomeEligible.Alerts\Functions.cs:line 46
   at IncomeEligible.Alerts.Functions.ProcessQueueMessage(String message, ILogger logger) in C:\source\repos\Income_Eligible_Price_Display\src\IncomeEligible.Alerts\IncomeEligible.Alerts\Functions.cs:line 80
   at Microsoft.Azure.WebJobs.Host.Executors.VoidTaskMethodInvoker`2.InvokeAsync(TReflected instance, Object[] arguments) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\VoidTaskMethodInvoker.cs:line 20
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionInvoker`2.InvokeAsync(Object instance, Object[] arguments) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionInvoker.cs:line 52
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.InvokeAsync(IFunctionInvoker invoker, ParameterHelper parameterHelper, CancellationTokenSource timeoutTokenSource, CancellationTokenSource functionCancellationTokenSource, Boolean throwOnTimeout, TimeSpan timerInterval, IFunctionInstance instance) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 584
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithWatchersAsync(IFunctionInstanceEx instance, ParameterHelper parameterHelper, ILogger logger, CancellationTokenSource functionCancellationTokenSource) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 531
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithLoggingAsync(IFunctionInstanceEx instance, ParameterHelper parameterHelper, IFunctionOutputDefinition outputDefinition, ILogger logger, CancellationTokenSource functionCancellationTokenSource) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 467
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithLoggingAsync(IFunctionInstanceEx instance, FunctionStartedMessage message, FunctionInstanceLogEntry instanceLogEntry, ParameterHelper parameterHelper, ILogger logger, CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 277
   --- End of inner exception stack trace ---
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithLoggingAsync(IFunctionInstanceEx instance, FunctionStartedMessage message, FunctionInstanceLogEntry instanceLogEntry, ParameterHelper parameterHelper, ILogger logger, CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 321
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.TryExecuteAsyncCore(IFunctionInstanceEx functionInstance, CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 117

以下是我的程序:

using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace IncomeEligible.Alerts
{
  public class Functions
  {
    private static JobClient jobClient;
    private static int Count = 0;
    private static string Message = "";

    private static string deviceId = "Huzzah_w_DHT22";

    private const string DeviceConnectionString =
      "HostName=...";

    private const string HubConnectionString =
      "HostName=...";

    public static async Task MonitorJob(string jobId)
    {
      JobResponse result;
      do
      {
        result = await jobClient.GetJobAsync(jobId);
        Console.WriteLine("Job Status : " + result.Status.ToString());
        Thread.Sleep(2000);
      } while ((result.Status != JobStatus.Completed) &&
               (result.Status != JobStatus.Failed));
    }

    public static async Task StartMethodJob(string jobId, string json)
    {
      CloudToDeviceMethod directMethod =
        new CloudToDeviceMethod("SetPriceAlert", TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));

      directMethod.SetPayloadJson(json);

      JobResponse result = await jobClient.ScheduleDeviceMethodAsync(jobId,
        $"DeviceId IN ['{deviceId}']",
        directMethod,
        DateTime.UtcNow,
        (long)TimeSpan.FromMinutes(2).TotalSeconds);

      Console.WriteLine("Started Method Job");
    }

    public static async Task ProcessQueueMessage([QueueTrigger("inbox")] string message, ILogger logger)
    {
      logger.LogInformation(message);
      Message = message;

      string methodJobId = Guid.NewGuid().ToString();
      await StartMethodJob(methodJobId, Message);
      MonitorJob(methodJobId).Wait();

    }

  }
}

如果工作正常,这种方法是否有意义或者更好,或者设置一个事件中心触发器或物联网中心触发器来检测设备何时连接并调用一个函数让它将 json 发送到连接的设备?

感谢您的帮助!

标签: c#arduinoazure-iot-hub

解决方案


jobClient未分配任何值。

此外,您不想async与阻塞呼叫混合使用.Wait()Thread.Sleep()。使用awaitandawait Task.Delay()代替。


推荐阅读