首页 > 解决方案 > 当我们将 Flink 应用程序部署到 Kinesis Data Analytics 中时,不会触发窗口化

问题描述

我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 后,它不会将记录发送到接收器中。

使用的技术

当地的

AWS

应用逻辑

  1. FlinkKafkaConsumer从主题中读取 json 格式的消息
  2. json 映射到域对象,称为Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);

    consumer.setStartFromEarliest(); //Just for repeatable testing

    return environment
            .addSource(consumer)
            .map(new MapJsonToTelemetry());
}
  1. 为 EventTimeStamp 选择了 Telemetry 的时间戳。
    3.1。和forMonotonousTimeStamps
  2. 遥测StateIso用于keyBy.
    4.1。美国的两个字母 iso 代码
  3. 应用 5 秒翻滚窗口策略
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
    WatermarkStrategy<Telemetry> wmStrategy =
            WatermarkStrategy
                    .<Telemetry>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.TimeStamp);

    return telemetries
            .assignTimestampsAndWatermarks(wmStrategy)
            .keyBy(t -> t.StateIso)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new WindowCountFunction());
}
  1. 调用一个自定义ProcessWindowFunction来执行一些基本的聚合。
    6.1。我们计算单个StateAggregatedTelemetry
  2. ElasticSearch 被配置为接收器。
    7.1。 StateAggregatedTelemetry数据被映射到 aHashMap并推入source.
    7.2. 所有setBulkFlushXYZ方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));

    ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            (ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
                Map<String, Object> record = new HashMap<>();

                record.put("stateIso", element.StateIso);
                record.put("healthy", element.Flawless);
                record.put("unhealthy", element.Faulty);
                ...

                LOG.info("Telemetry has been added to the buffer");
                indexer.add(Requests.indexRequest()
                        .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                        .source(record, XContentType.JSON));
            }
    );

    //Using low values to make sure that the Flush will happen
    esSinkBuilder.setBulkFlushMaxActions(25);
    esSinkBuilder.setBulkFlushInterval(1000);
    esSinkBuilder.setBulkFlushMaxSizeMb(1);
    esSinkBuilder.setBulkFlushBackoff(true);
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {});

    LOG.info("Sink has been attached to the DataStream");
    telemetries.addSink(esSinkBuilder.build());
}

排除的东西

{
    "locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
    "logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
    "message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}

回复

{
    "locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
    "logger": "org.elasticsearch.client.RestClient",
    "message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "DEBUG"
}

我们没有运气的尝试


我们假设我们在 sink 端看不到数据,因为没有触发窗口处理逻辑。这就是为什么在 CloudWatch 中看不到处理日志的原因。

任何帮助都将受到欢迎!


更新#1


更新#2

平均消息大小约为 4kb。以下是示例消息的摘录:

{
  "affiliateCode": "...",
  "appVersion": "1.1.14229",
  "clientId": "guid",
  "clientIpAddr": "...",
  "clientOriginated": true,
  "connectionType": "Cable/DSL",
  "countryCode": "US",
  "design": "...",
  "device": "...",
  ...
  "deviceSerialNumber": "...",
  "dma": "UNKNOWN",
  "eventSource": "...",
  "firstRunTimestamp": 1609091112818,
  "friendlyDeviceName": "Comcast",
  "fullDevice": "Comcast ...",
  "geoInfo": {
    "continent": {
      "code": "NA",
      "geoname_id": 120
    },
    "country": {
      "geoname_id": 123,
      "iso_code": "US"
    },
    "location": {
      "accuracy_radius": 100,
      "latitude": 37.751,
      "longitude": -97.822,
      "time_zone": "America/Chicago"
    },
    "registered_country": {
      "geoname_id": 123,
      "iso_code": "US"
    }
  },
  "height": 720,
  "httpUserAgent": "Mozilla/...",
  "isLoggedIn": true,
  "launchCount": 19,
  "model": "...",
  "os": "Comcast...",
  "osVersion": "...",
  ...
  "platformTenantCode": "...",
  "productCode": "...",
  "requestOrigin": "https://....com",
  "serverTimeUtc": 1617809474787,
  "serviceCode": "...",
  "serviceOriginated": false,
  "sessionId": "guid",
  "sessionSequence": 2,
  "subtype": "...",
  "tEventId": "...",
  ...
  "tRegion": "us-east-1",
  "timeZoneOffset": 5,
  "timestamp": 1617809473305,
  "traits": {
    "isp": "Comcast Cable",
    "organization": "..."
  },
  "type": "...",
  "userId": "guid",
  "version": "v1",
  "width": 1280,
  "xb3traceId": "guid"
}

我们只ObjectMapper用来解析 json 的一些字段。下面是这个Telemetry类的样子:

public class Telemetry {
    public String AppVersion;
    public String CountryCode;
    public String ClientId;
    public String DeviceSerialNumber;
    public String EventSource;
    public String SessionId;
    public TelemetrySubTypes SubType; //enum
    public String TRegion;
    public Long TimeStamp;
    public TelemetryTypes Type; //enum
    public String StateIso;
    
    ...
}

更新#3

资源

子任务选项卡

ID 收到的字节数 收到的记录 发送的字节数 发送的记录 地位
0 0乙 0 0乙 0 跑步
1 0乙 0 2.83 MB 15,000 跑步

水印标签

没有数据

窗户

子任务选项卡

ID 收到的字节数 收到的记录 发送的字节数 发送的记录 地位
0 1.80 MB 9,501 0乙 0 跑步
1 1.04 MB 5,499 0乙 0 跑步

水印

子任务 水印
1 无水印
2 无水印

标签: javaelasticsearchapache-kafkaapache-flinkamazon-kinesis-analytics

解决方案


根据您提供的评论和更多信息,问题似乎在于两个 Flink 消费者不能从同一个分区消费。因此,在您的情况下,只有一个运算符的并行实例将从 kafka 分区消耗,而另一个将处于空闲状态。

通常 Flink 操作员会选择MIN([all_downstream_parallel_watermarks]),因此在您的情况下,一个 Kafka 消费者将产生正常的水印,而另一个将永远不会产生任何东西(flinkLong.Min在这种情况下假设),所以 Flink 将选择较低的一个Long.Min。因此,window 永远不会被触发,因为当数据流动时,永远不会生成水印之一。好的做法是在使用 Kafka 时使用与 Kafka 分区数量相同的并行性。


推荐阅读