首页 > 解决方案 > javascript异步任务处理

问题描述

我是 Node 开发的新手。我无法理解 JS 和 Node.js 的异步特性。我正在构建一个微服务后端 Web 服务器,在网关服务中运行 Express,它将 REST 请求转换为使用 RabbitMQ 异步消息传递 (amqplib) 模块发布的一系列消息,而其他服务可以订阅、处理请求然后响应.

我处理来自网关的异步请求的服务如下所示:

amqp.connect('amqp://172.17.0.2', function(err, conn) {
  console.log("connection created");
  conn.createChannel(function(err, ch) {
    console.log("channel created");
    var exchange = 'main';

    ch.assertExchange(exchange, 'topic', {durable: true});

    ch.assertQueue('', {exclusive: true}, function(err, q) {
      console.log(' [*] Waiting for logs. To exit press CTRL+C');

      ch.bindQueue(q.queue, exchange, "VENUE_TOPIC.PENDING_STATUS.*.*");

      ch.consume(q.queue, function(msg) { 
        console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
        var pending_event = JSON.parse(msg.content.toString())
        console.log(pending_event.payload.id == 2)
        console.log(pending_event.payload.id)
        if (pending_event.payload.id == 1) { 
          var venue = getVenueByID(pending_event.payload.id);
          const approved_event = new Event("VENUE_TOPIC", "APPROVED_STATUS", false, "READ_METHOD", {"venue":venue});
          var key = approved_event.getMetaAsTopics();

          var msg = Buffer.from(JSON.stringify(approved_event.getEventAsJSON()));

          ch.assertExchange(exchange, 'topic', {durable: true});
          ch.publish(exchange, key, msg, {persistent: true});
          console.log(" [x] Sent '%s'", msg);        
        } else if (pending_event.payload.id == 2) {
          sleep(10000); //this function checks the OS's clock to count time in ms
          var venue = getVenueByID(pending_event.payload.id);
          const approved_event = new Event("VENUE_TOPIC", "APPROVED_STATUS", false, "READ_METHOD", {"venue":venue}); 
          var key = approved_event.getMetaAsTopics();

          var msg = Buffer.from(JSON.stringify(approved_event.getEventAsJSON()));

          ch.assertExchange(exchange, 'topic', {durable: true});
          ch.publish(exchange, key, msg, {persistent: true});
          console.log(" [x] Sent '%s'", msg);
        }
      }, {noAck: true});
    });
  });
});

假设我有 2 个请求,一个需要很长时间才能完成,另一个需要更短的时间。较长的请求在较短的请求之前出现。在我的示例中,长进程是 ID === 2,短进程是 ID === 1。

现在,如果我发送一个 ID 为 2 的请求,然后立即发送一个 ID 为 1 的请求,我必须等待 10 秒才能完成第一个请求,然后另一个完成。

我无法理解是否可以同时处理两个请求,而不会让长进程阻塞短进程直到它完成。

标签: javascriptexpressasynchronousrabbitmq

解决方案


由于我没有足够的 SO 声誉,因此我无法发表评论,但我会尝试提供尽可能多的见解。

假设这个sleepnpm 包是预期的行为,即所谓的行为。如果您计划在您的消费函数中进行非阻塞调用(如 Web 请求),您会很好,节点将能够在处理时消费更多消息。但是,如果您需要进行阻塞/cpu 繁重的工作并且仍然能够处理传入的消息,则您需要将这项工作卸载给一些工作人员。blocking

试试这个代码,它会“休眠”而不阻塞:

setTimeout(() => {
    var venue = getVenueByID(pending_event.payload.id);
    const approved_event = new Event("VENUE_TOPIC", "APPROVED_STATUS", false, "READ_METHOD", {
        "venue": venue
    });
    var key = approved_event.getMetaAsTopics();

    var msg = Buffer.from(JSON.stringify(approved_event.getEventAsJSON()));

    ch.assertExchange(exchange, 'topic', {
        durable: true
    });
    ch.publish(exchange, key, msg, {
        persistent: true
    });
    console.log(" [x] Sent '%s'", msg);
}, 10000);

推荐阅读