java - 收到n个元素后如何退出akka流?
问题描述
我是 Akka 的新手,我只是想掌握它。
作为一个实验,我想从 Kinesis 流中读取并收集 n 条消息并停止。
我发现唯一会停止读取记录的是 Sink.head()。但这只会返回一条记录,我想获得更多。
我不太明白如何在收到 n 条消息后停止从流中读取。
这是我到目前为止尝试过的代码
@Test
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
final ActorSystem system = ActorSystem.create("foo");
final Materializer materializer = ActorMaterializer.create(system);
ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(profileCredentialsProvider)
.region(Region.US_WEST_2)
.httpClient(AkkaHttpClient.builder()
.withActorSystem(system).build())
.build();
system.registerOnTermination(kinesisClient::close);
String streamName = "akka-test-stream";
String shardId = "shardId-000000000000";
int numberOfRecordsToRead = 3;
final ShardSettings settings = ShardSettings.create(streamName, shardId)
.withRefreshInterval(Duration.ofSeconds(1))
.withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
.withShardIterator(ShardIterators.latest());
final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
// Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
// Flow.of(String.class).groupedWithin(
// numberOfRecordsToRead, // group size
// Duration.ofSeconds(60) // group time
// );
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter);
// .via(flowGroupedWithinMinute); // nope
// sink to list of strings
// Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
// Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message
CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
.runWith(sink10, materializer);
CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
completableFuture.join(); // never stops running...
List<String> result = completableFuture.get();
int foo = 1;
}
private String extractDataFromRecord(Record record) {
String encType = record.encryptionTypeAsString();
Instant arrivalTimestamp = record.approximateArrivalTimestamp();
String data = record.data().asString(StandardCharsets.UTF_8);
return data;
}
private String debugPrint(String s) {
System.out.println(s);
return s;
}
谢谢你的任何线索
解决方案
只是为了补充您找到的答案,也可以更直接地表达事物,而无需via
:
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.map(record -> extractDataFromRecord(record))
.map(s -> debugPrint(s))
.take(10)
推荐阅读
- javascript - 如何在同一个组件中使用道具?
- java - 有没有其他方法可以删除字符串中的所有空格?
- python - Python 安装证书命令
- flutter - 如何在颤振集成测试中查看故障线的行数?
- android - Android:分配一次图形并在屏幕上移动它而不重写像素
- c# - Eventstore:如何正确设置持久订阅客户端?
- vector - clojure 中向量的相等性
- angular - 在 Angular 中自定义布尔玛
- python - Plotly 和 GeoPandas - 可以 Plotly 绘制多边形几何图形吗?
- google-analytics - 谷歌分析标签不适用于带有 GH 页面的 Jekyll