首页 > 解决方案 > 收到n个元素后如何退出akka流?

问题描述

我是 Akka 的新手,我只是想掌握它。

作为一个实验,我想从 Kinesis 流中读取并收集 n 条消息并停止。

我发现唯一会停止读取记录的是 Sink.head()。但这只会返回一条记录,我想获得更多。

我不太明白如何在收到 n 条消息后停止从流中读取。

这是我到目前为止尝试过的代码

  @Test
  public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
            .httpClient(AkkaHttpClient.builder()
                .withActorSystem(system).build())
            .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName, shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
            .withShardIterator(ShardIterators.latest());

    final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);

    Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
    Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
//    Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead, // group size
//            Duration.ofSeconds(60) // group time
//        );

    Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope

    // sink to list of strings
//    Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
    Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
//    Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10, materializer);
    CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
    completableFuture.join(); // never stops running...
    List<String> result = completableFuture.get();
    int foo = 1;
  }

  private String extractDataFromRecord(Record record) {
    String encType = record.encryptionTypeAsString();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asString(StandardCharsets.UTF_8);
    return data;
  }

  private String debugPrint(String s) {
    System.out.println(s);
    return s;
  }

谢谢你的任何线索

标签: javaakkaakka-streamamazon-kinesis

解决方案


只是为了补充您找到的答案,也可以更直接地表达事物,而无需via

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -> extractDataFromRecord(record))
    .map(s -> debugPrint(s))
    .take(10)

推荐阅读