java - 在 apache flink 中的 Flink 连接流中面临竞争条件
问题描述
在 flink 连接的流中实现流程功能时面临竞争条件。我Cache Map
正在两个函数之间共享它processElement1
,processElement2
并且由 2 个不同的线程并行调用。
Streams1
--->(发送报价数据)
Streams2
--->(发送lms(忠诚度管理系统数据))
connect=Streams1.connect(Streams2);
connect.process(new TriggerStream);
在TriggerStream Class
我使用唯一 ID 存储数据时:MemberId
至于unique Key
Store & lookup data
in cache。当数据流入时,我没有得到一致的结果
class LRUConcurrentCache<K,V>{
private final Map<K,V> cache;
private final int maxEntries;
public LRUConcurrentCache(final int maxEntries) {
this.cache = new LinkedHashMap<K,V>(maxEntries, 0.75F, true) {
private static final long serialVersionUID = -1236481390177598762L;
@Override
protected boolean removeEldestEntry(Map.Entry<K,V> eldest){
return size() > maxEntries;
}
};
}
//Why we cant lock on the key
public void put(K key, V value) {
synchronized(key) {
cache.put(key, value);
}
}
//get methode
public V get(K key) {
synchronized(key) {
return cache.get(key);
}
}
public class TriggerStream extends CoProcessFunction<IOffer, LMSData, String> {
private static final long serialVersionUID = 1L;
LRUCache cache;
private String offerNode;
String updatedValue, retrivedValue;
Subscriber subscriber;
TriggerStream(){
this.cache== new LRUCache(10);
}
@Override
public void processElement1(IOffer offer) throws Exception {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
mapper.enableDefaultTyping();
// TODO Auto-generated method stub
IOffer latestOffer = offer;
//Check the subscriber is there or not
retrivedValue = cache.get(latestOffer.getMemberId().toString());
if ((retrivedValue == null)) {
//Subscriber is the class that is used and converted as Json String & then store into map
Subscriber subscriber = new Subscriber();
subscriber.setMemberId(latestOffer.getMemberId());
ArrayList<IOffer> offerList = new ArrayList<IOffer>();
offerList.add(latestOffer);
subscriber.setOffers(offerList);
updatedValue = mapper.writeValueAsString(subscriber);
cache.set(subscriber.getMemberId().toString(), updatedValue);
} else {
Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
List<IOffer> offers = subscriber.getOffers();
offers.add(latestOffer);
updatedValue= mapper.writeValueAsString(subscriber);
cache.set(subscriber.getMemberId().toString(), subscriberUpdatedValue);
}
} catch (Exception pb) {
applicationlogger.error("Exception in Offer Loading:"+pb);
applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
}
applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
}
@Override
public void processElement2(LMSData lms) throws Exception {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
mapper.enableDefaultTyping();
// TODO Auto-generated method stub
//Check the subscriber is there or not
retrivedValue = cache.get(lms.getMemberId().toString());
if(retrivedValue !=null){
Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
//do some calculations
String updatedValue = mapper.writeValueAsString(subscriber);
//Update value
cache.set(subscriber.getMemberId().toString(), updatedValue);
}
} catch (Exception pb) {
applicationlogger.error("Exception in Offer Loading:"+pb);
applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
}
applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
}
}
解决方案
Flink 不保证 a CoProcessFunction
(或任何其他 Co*Function)以何种顺序摄取数据。在分布式并行任务中维护某种确定性顺序将过于昂贵。
相反,您必须在函数中使用状态和可能的计时器来解决这个问题。您的LRUCache
函数中的 应该保持为状态(可能是键控状态)。否则,一旦发生故障,它将丢失。您可以为第一个流和缓冲区记录添加另一个状态,直到来自第二个流的查找值到达。
推荐阅读
- php - 试图从嵌套数组中提取一组值
- php - 在下拉列表中选择 True 或 False 应在 mysql 表中反映为 o 和 1
- python - How to add object level permission in django admin
- mysql - mysql use case in jpa query
- angular - attachment propriety not working in ionic local notification plugin
- flutter - 检查文本小部件是否具有样式
- android - open wrong App via Notification click in same Android device
- docker - Using Traefik with TLS (acme plugin) on non HTTP port for HTTP traffic
- javascript - ajax 成功时如何更改我的班级数据?
- c# - 已解决:如何从 devexpress C# 项目中的 WinForm 中单击下拉按钮打开用户控件 WinForm?