首页 > 解决方案 > 在 micronaut 中启动异步辅助线程(或回调)的正确方法

问题描述

我想从像 Google 的PubSub这样的服务接收实时回调,然后使用 Micronaut 服务提供这些消息 - 即,让请求特定端点的人可以使用这些消息。

为了达到这个结果,我有一个Controller拥有一个 Bean。(singleton) bean 负责初始化与消息服务的关系并实现,ApplicationEventListener<ServiceStartedEvent>以便我可以在onApplicationEvent()该接口的方法中执行消息服务初始化。我用 注释这个方法@Async,理解这意味着 bean 将在线程池上执行,我认为这很重要,因为这个 bean 由实现MessageReceiver接口的静态类辅助;此接口由 PubSub 框架异步调用,以传递我订阅的消息。

因此,Bean 会进入一个 while 循环,以便于从接口异步回调到receiveMessage()方法。MessageReceiver我使用了线程安全的 Java BlockingQueue接口之一,以便在调用阻塞take()方法时将我的 while 循环安排在堆栈之外(而不是 a Thread.sleep())。

所以,我的问题是:

  1. 这是处理异步回调的正确方法,还是会消耗 Micronaut 通常会使用的资源 - 即,onApplicationEvent()永远不会“返回”是否可以?(下面的代码有效,但我没有分析它)
  2. bean 是否应该拥有/使用TaskSchedulerorExecutorService来创建此行为,还是这样@Async做?(我已经阅读了指南,这TaskScheduler似乎是一种选择)
  3. 这种方法是否可以扩展 - 即,如果我订阅了多个主题,每个消息主题是否应该有一个 Bean?

这是 bean 代码,它在我的测试服务器上运行良好:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;

import javax.inject.Singleton;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.discovery.event.ServiceStartedEvent;
import io.micronaut.scheduling.annotation.Async;

@Singleton
public class PubsubManager implements ApplicationEventListener<ServiceStartedEvent> {
    private static final Logger log = LoggerFactory.getLogger(PubsubManager.class);
    // use the default project id
    private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
    private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
    private static final ConcurrentLinkedQueue<PubsubMessage> latestMessages = new ConcurrentLinkedQueue<>();

    private String subscriptionId = "sub1";
    private Subscriber subscriber = null;

    static class PubsubReceiver implements MessageReceiver {
        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            log.info("received message id: " + message.getMessageId());
            latestMessages.add(message);
            //local queue for routing to db
            messages.offer(message);
            consumer.ack();
        }
    }

    @Async
    @Override
    public void onApplicationEvent(final ServiceStartedEvent event) {
        log.info("Loading pubsub at startup");
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
        // create a subscriber bound to the asynchronous message receiver
        subscriber = Subscriber.newBuilder(subscriptionName, new PubsubReceiver()).build();
        subscriber.startAsync().awaitRunning();
        log.info("started pubsub");
        // Continue to listen to messages
        try {
            while (true) {
                PubsubMessage message = messages.take();//this schedules us off stack
                log.info("Message Id: " + message.getMessageId());
                log.info("Data: " + message.getData().toStringUtf8());
                log.info("as string: " + message.toString());
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }

    public static ConcurrentLinkedQueue<PubsubMessage> getLatestMessages() {
        return latestMessages;
    }

}

标签: javamicronaut

解决方案


推荐阅读