java - 在 micronaut 中启动异步辅助线程(或回调)的正确方法
问题描述
我想从像 Google 的PubSub这样的服务接收实时回调,然后使用 Micronaut 服务提供这些消息 - 即,让请求特定端点的人可以使用这些消息。
为了达到这个结果,我有一个Controller
拥有一个 Bean。(singleton) bean 负责初始化与消息服务的关系并实现,ApplicationEventListener<ServiceStartedEvent>
以便我可以在onApplicationEvent()
该接口的方法中执行消息服务初始化。我用 注释这个方法@Async
,理解这意味着 bean 将在线程池上执行,我认为这很重要,因为这个 bean 由实现MessageReceiver接口的静态类辅助;此接口由 PubSub 框架异步调用,以传递我订阅的消息。
因此,Bean 会进入一个 while 循环,以便于从接口异步回调到receiveMessage()
方法。MessageReceiver
我使用了线程安全的 Java BlockingQueue接口之一,以便在调用阻塞take()
方法时将我的 while 循环安排在堆栈之外(而不是 a Thread.sleep()
)。
所以,我的问题是:
- 这是处理异步回调的正确方法,还是会消耗 Micronaut 通常会使用的资源 - 即,
onApplicationEvent()
永远不会“返回”是否可以?(下面的代码有效,但我没有分析它) - bean 是否应该拥有/使用
TaskScheduler
orExecutorService
来创建此行为,还是这样@Async
做?(我已经阅读了指南,这TaskScheduler
似乎是一种选择) - 这种方法是否可以扩展 - 即,如果我订阅了多个主题,每个消息主题是否应该有一个 Bean?
这是 bean 代码,它在我的测试服务器上运行良好:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import javax.inject.Singleton;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.discovery.event.ServiceStartedEvent;
import io.micronaut.scheduling.annotation.Async;
@Singleton
public class PubsubManager implements ApplicationEventListener<ServiceStartedEvent> {
private static final Logger log = LoggerFactory.getLogger(PubsubManager.class);
// use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
private static final ConcurrentLinkedQueue<PubsubMessage> latestMessages = new ConcurrentLinkedQueue<>();
private String subscriptionId = "sub1";
private Subscriber subscriber = null;
static class PubsubReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
log.info("received message id: " + message.getMessageId());
latestMessages.add(message);
//local queue for routing to db
messages.offer(message);
consumer.ack();
}
}
@Async
@Override
public void onApplicationEvent(final ServiceStartedEvent event) {
log.info("Loading pubsub at startup");
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
// create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(subscriptionName, new PubsubReceiver()).build();
subscriber.startAsync().awaitRunning();
log.info("started pubsub");
// Continue to listen to messages
try {
while (true) {
PubsubMessage message = messages.take();//this schedules us off stack
log.info("Message Id: " + message.getMessageId());
log.info("Data: " + message.getData().toStringUtf8());
log.info("as string: " + message.toString());
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
public static ConcurrentLinkedQueue<PubsubMessage> getLatestMessages() {
return latestMessages;
}
}
解决方案
推荐阅读
- swift - 从 Firebase 检索无价值的数据
- json.net - 如何解析“uuuu'-'MM'-'dd'T'HH':'mm':'ss;FFFFFFFFFo
“变成瞬间? - java - 来自不同函数的线程中断
- eclipse - 防止日食跳过自动添加的右括号
- java - 使用 Coherence jar 运行多播测试时出错
- ios - 当所有文本字段都填写在tableviewcell(SWIFT)中时如何启用按钮
- grails - GGTS 下载页面的 404
- .htaccess - `%{HTTP_HOST}` 的重写条件被忽略了吗?
- testing - 尝试使用 TCMS API 更新 TestCase
- unity3d - Unity ScrollRect / ScrollView 优化 / 性能我学到了什么