hazelcast-jet - Hazelcast jet 0.6.1 管道和 DAG 定义
问题描述
我有构建管道的示例代码。
private Pipeline buildPipeline() {
logger.debug("AbstractAuditLogProcessor.buildPipeline method start");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getPlatformClientConfig(), START_FROM_OLDEST))
.addTimestamps((v) -> getTimeStamp(v), 3000)
.peek()
.groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
.window(WindowDefinition.sliding(getSlidingWindowLengthInMillies(), getSlidingStepInMillies()))
.aggregate(counting())
.map((v)-> getMapKey(v))
//.<Map.Entry<String, Long>>customTransform("test2", ()-> this)
//.<Offer>customTransform("Offer_Recommendations", ()-> this)
.<Map.Entry<String, Offer>>customTransform("Offer_Recommendations", ()-> this)
//.drainTo(Sinks.remoteList("cache_OfferRecommendations", getPlatformClientConfig()));
.drainTo(Sinks.remoteMap("cache_OfferRecommendations", getPlatformClientConfig()));
logger.debug("AbstractAuditLogProcessor.buildPipeline method end");
return p;
}
此代码打印以下 DAG 信息
dag
.vertex("remoteMapJournalSource(cache_AuditLog)").localParallelism(1)
.vertex("sliding-window-step1").localParallelism(4)
.vertex("sliding-window-step2").localParallelism(4)
.vertex("map").localParallelism(4)
.vertex("Offer_Recommendations").localParallelism(4)
.vertex("remoteMapSink(cache_OfferRecommendations)").localParallelism(1)
.edge(between("remoteMapJournalSource(cache_AuditLog)", "sliding-window-step1").partitioned(?))
.edge(between("sliding-window-step1", "sliding-window-step2").partitioned(?).distributed())
.edge(between("sliding-window-step2", "map"))
.edge(between("map", "Offer_Recommendations"))
.edge(between("Offer_Recommendations", "remoteMapSink(cache_OfferRecommendations)"))
DAG 信息具有其他详细信息/方法调用,例如 partitioned() 、 Distributed()
这是否根据密钥分配记录?同样,hazelcast jet 如何确保记录不会移动到不同的分区。
解决方案
推荐阅读
- go - golang模板中字符串切片的范围
- python - 如何使类 JSON 可序列化以在 Django 会话中使用
- c# - 错误 CS0266 无法将类型“double”隐式转换为“int”
- haskell - 使用 Tagless Final 方法时的类型变量不明确
- machine-learning - 超过 1000 个课程的 Keras 迁移学习
- javascript - 有没有办法在 javascript 类中创建一个方法来创建变量?
- javascript - 编写此代码的更好方法?我试图避免双重返回语句
- django - Django Unittest 无法运行 TypeError: isinstance() arg 2 must be a type or tuple of types on a machine but not others
- javascript - 我的反应表代码有问题吗?我看不到我的桌子
- python - 如何通过构造解析bin文件?