java - 提交方法不会调用 onNext FLOW STREAM API JAVA
问题描述
我正在学习 Java 中的 FLOW Stream API,我目前正在创建一个基于oracle community上的示例。问题是我看不到预期的输出,而只是在onSubscribe
方法内部打印的 SUBSCRIBING 字符串。我已经检查并在 StackOverflow 上找到了submissionpublisher-on-submit-not-invoking-onnext-of-subscriber,但没有工作,因为我已经在调用request(Long N)
.
import java.util.concurrent.Flow;
public class Computer<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("SUBSCRIBING");
this.subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println(String.format("Got %s", item.toString()));
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("DONE");
}
}
--
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class Sensor {
public static void main(String[] args) {
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>();
Computer<String> subscriber = new Computer<>();
submissionPublisher.subscribe(subscriber);
List<String> items = List.of("1.25", "1.224", "1.55");
items.forEach(submissionPublisher::submit);
submissionPublisher.close();
}
}
我只是看到:
SUBSCRIBING
为什么onNext
方法没有被调用?
解决方案
您没有将 a 传递ScheduledExecutorService
给Publisher
它,它基本上是一个 ExecutorService ,它可以安排任务在延迟后运行或在每次执行之间以固定的时间间隔重复执行。
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
public class Sensor {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, 5);
Computer<String> subscriber = new Computer<>();
submissionPublisher.subscribe(subscriber);
List<String> items = List.of("1.25", "1.224", "1.55");
items.forEach(submissionPublisher::submit);
submissionPublisher.close();
executor.shutdown();
}
}
推荐阅读
- c++ - c++ 函数检查树是否已满的问题
- python - 我正在为烧瓶使用 docker 并且 pytesseract 容器正在运行但无法访问浏览器上的页面
- regex - 为多种模式编写正则表达式
- sql - SQL 查询以获取没有关联服务的用户
- java - java中构造函数链接的意义何在以及如何将它与tostring()结合起来?
- neo4j - 这个 Cypher 查询中缺少什么?
- linux - Linux 与 Windows 中的系统库
- c# - 根据用户对一系列问题的回答隐藏 Word 文档中的部分
- javascript - 使用用户名获取文档参考
- c - 在 C99 的除法中使用指针,错误:二进制操作数无效