java - 背压不适用于 groupBy 运算符
问题描述
我正在尝试对我的代码段施加背压,但它不起作用。我尝试使用此处给出的示例代码,它看起来像工作,
Flux.range(1,100)
.doOnNext(d->getLogger().info("receive record ::: {}",d))
.flatMap(recordFlux -> Mono.delay(Duration.ofSeconds(30))
.doOnNext(d->getLogger().info("processed message :: {}",recordFlux))
.then(Mono.just(recordFlux))
,1
)
.subscribe();
这是我得到的输出,
2021-11-02 15:34:25.678 INFO 7456 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 1
2021-11-02 15:34:55.682 INFO 7456 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 1
2021-11-02 15:34:55.684 INFO 7456 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 2
2021-11-02 15:35:25.685 INFO 7456 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 2
2021-11-02 15:35:25.686 INFO 7456 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 3
2021-11-02 15:35:55.687 INFO 7456 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 3
2021-11-02 15:35:55.687 INFO 7456 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 4
2021-11-02 15:36:25.690 INFO 7456 --- [ parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 4
2021-11-02 15:36:25.691 INFO 7456 --- [ parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 5
2021-11-02 15:36:55.697 INFO 7456 --- [ parallel-5] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 5
2021-11-02 15:36:55.698 INFO 7456 --- [ parallel-5] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 6
2021-11-02 15:37:25.704 INFO 7456 --- [ parallel-6] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 6
2021-11-02 15:37:25.704 INFO 7456 --- [ parallel-6] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 7
2021-11-02 15:37:55.714 INFO 7456 --- [ parallel-7] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 7
2021-11-02 15:37:55.714 INFO 7456 --- [ parallel-7] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 8
2021-11-02 15:38:25.720 INFO 7456 --- [ parallel-8] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 8
2021-11-02 15:38:25.720 INFO 7456 --- [ parallel-8] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 9
2021-11-02 15:38:55.723 INFO 7456 --- [ parallel-9] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 9
2021-11-02 15:38:55.723 INFO 7456 --- [ parallel-9] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 10
2021-11-02 15:39:25.726 INFO 7456 --- [ parallel-10] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 10
但是当我在平面图上添加额外的 groupBy 时,它不起作用。添加 groupBy 有什么作用?如何使用以下代码实现上述结果?
Flux.range(1,100)
.doOnNext(d->getLogger().info("receive record ::: {}",d))
.groupBy(m->1 )
.flatMap(consumerRecordFlux -> consumerRecordFlux
.doOnNext(a -> getLogger().info("before process message :: partition ::{}, record ::{}",consumerRecordFlux.key(),a))
.flatMap(b-> Mono.delay(Duration.ofSeconds(30))
.doOnNext(d->getLogger().info("processed message :: {}",b))
.then(Mono.just(b))
,1,1
)
,1,1
)
.subscribe();
这是我在上面的代码中得到的错误输入。
2021-11-02 15:51:44.827 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 1
2021-11-02 15:51:44.829 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::1
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 2
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 3
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 4
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 5
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 6
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 7
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 8
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 9
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 10
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 11
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 12
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 13
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 14
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 15
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 16
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 17
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 18
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 19
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 20
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 21
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 22
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 23
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 24
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 25
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 26
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 27
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 28
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 29
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 30
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 31
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 32
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 33
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 34
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 35
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 36
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 37
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 38
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 39
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 40
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 41
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 42
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 43
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 44
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 45
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 46
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 47
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 48
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 49
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 50
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 51
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 52
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 53
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 54
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 55
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 56
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 57
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 58
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 59
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 60
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 61
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 62
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 63
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 64
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 65
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 66
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 67
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 68
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 69
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 70
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 71
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 72
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 73
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 74
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 75
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 76
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 77
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 78
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 79
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 80
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 81
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 82
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 83
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 84
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 85
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 86
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 87
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 88
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 89
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 90
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 91
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 92
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 93
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 94
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 95
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 96
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 97
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 98
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 99
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 100
2021-11-02 15:52:14.836 INFO 8071 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 1
2021-11-02 15:52:14.837 INFO 8071 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::2
2021-11-02 15:52:44.843 INFO 8071 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 2
2021-11-02 15:52:44.844 INFO 8071 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::3
2021-11-02 15:53:14.846 INFO 8071 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 3
2021-11-02 15:53:14.847 INFO 8071 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::4
2021-11-02 15:53:44.852 INFO 8071 --- [ parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 4
解决方案
推荐阅读
- r - ggplot2分组条形图上的绘图线
- r - 每隔一列的数据框
- python - 嵌套生成器表达式调用动态引用的函数
- flutter - Flutter:graphql_flutter FetchPolicy.noCache 返回来自不同用户的缓存数据
- apache - 在 httpd.conf 中设置适用于所有服务站点的重写规则
- ejs - 无法在脚本标签内添加 ejs 数组
- r - 在数据框中按组和年份与前一行相比,行中的唯一值
- node.js - 使用带有初始化项目的 aeproject test 命令时出现错误
- java - 是否有在构造函数中复制对象的简写?
- c++ - 安装 'Boost' 库 - 使用 VSCode。视窗 10