ignite - 可以在 StreamReceiver/Visitor 的单独节点上执行 Ignite Streamer.addData 吗?
问题描述
是否可以从客户端节点进行流注入并在服务器节点中拦截相同的流以在插入缓存之前处理流?
这样做的原因是客户端节点从外部源接收流,并且需要将相同的流注入到基于 AffinityKey 跨多个服务器节点的分区缓存中。需要在每个节点上截取流并以最低延迟进行处理。我本可以使用缓存事件来做到这一点,但 StreamVisitor 应该更快。
以下是我要执行的示例。启动 2 个节点:一个包含流媒体,另一个包含 streamReciever:
公共类 StreamerNode {
public static void main(String[] args) { ...... Ignition.setClientMode(false); 点燃 ignite = Ignition.start(igniteConfiguration);
CacheConfiguration<SeqKey, String> myCfg = new CacheConfiguration<SeqKey, String>("myCache");
......
IgniteCache<SeqKey, String> myCache = ignite.getOrCreateCache(myCfg);
IgniteDataStreamer<SeqKey, String> myStreamer = ignite.dataStreamer(myCache.getName()); // Create Ignite Streamer for windowing data
for (int i = 51; i <= 100; i++) {
String paddedString = org.apache.commons.lang.StringUtils.leftPad(i+"", 7, "0") ;
String word = "TEST_" + paddedString;
SeqKey seqKey = new SeqKey("TEST", counter++ );
myStreamer.addData(seqKey, word) ;
}
}
}
public class VisitorNode {
public static void main(String[] args) { ...... Ignition.setClientMode(false); 点燃 ignite = Ignition.start(igniteConfiguration);
CacheConfiguration<SeqKey, String> myCfg = new CacheConfiguration<SeqKey, String>("myCache");
......
IgniteCache<SeqKey, String> myCache = ignite.getOrCreateCache(myCfg);
IgniteDataStreamer<SeqKey, String> myStreamer = ignite.dataStreamer(myCache.getName()); // Create Ignite Streamer for windowing data
myStreamer.receiver(new StreamVisitor<SeqKey, String>() {
int i=1 ;
@Override
public void apply(IgniteCache<SeqKey, String> cache, Map.Entry<SeqKey, String> e) {
String tradeGetData = e.getValue();
System.out.println(nodeID+" : visitorNode ..count="+ i++ + " received key="+e.getKey() + " : val="+ e.getValue());
//do some processing here before inserting in the cache ..
cache.put(e.getKey(), tradeGetData);
}
});
}
}
解决方案
当然它可以在不同的节点上执行。通常,addData()
在客户端节点上执行,并StreamReceiver
在服务器节点上工作。你不需要做任何特别的事情来实现它。
至于你的帖子的其余部分,你能否用更多的细节和样本来详细说明它?我无法理解所需的设置。
如果您不需要修改数据,则可以使用连续查询,只需对其进行操作。
推荐阅读
- c# - 在 MainActivity.cs 中未识别 Id
- c - 在 C 中使用 fprintf 打印到 stderr
- r - 使用 f(myclass.myfunc) 语法将函数传递给 R
- mysql - 如何从 MySQL FUNCTION 返回一个数字序列?
- amazon-web-services - 如何在 CloudFormation 中为 CodeCommit 制作 SNS 通知
- javascript - 这是一个设计糟糕的 API 吗?
- java - 如何仅使用两个变量找到两点之间的距离,然后存储所有点并获得形状?
- ios - 静默推送通知在带有 FCM 的 iOS 中有声音,如何解决?
- c# - 如何跳过 DataTable 的某些列标题文本并在数据表 C# 中选择其余部分?
- python - 访问两个子列表中的元素而不使用python中的嵌套for循环