apache-kafka - 如何以自定义方式从主题恢复全局存储?
问题描述
假设我在从主题获取数据后将数据存储在 Globalstore 中时进行了一些自定义处理,即我正在从消息的值创建自定义键。在本地删除状态后,它是否会以相同的方式再次恢复 Globalstore。
override def process(key: String, value: String): Unit = {
logger.info("telephonyUsersProcessorCounter = "+telephonyUsersProcessorCounter)
telephonyUsersProcessorCounter = telephonyUsersProcessorCounter +1
val telKey = processKey(key)
if (telKey.isDefined) {
val telValue = processValue(value)
if(telValue.isDefined ){
StreamConstants.teleStore.get.put(telKey.get,telValue.get)
val compositeKeyForNumber = telValue.get.enterpriseId + telValue.get.phoneNumber
val compositeKeyForDeviceName = telValue.get.enterpriseId +telValue.get.deviceName
val compositeKeyForNumberAndDeviceName = telValue.get.enterpriseId +telValue.get.phoneNumber+telValue.get.deviceName
val telCompositeKeyForNumber = StreamConstants.teleStore.get.get(compositeKeyForNumber)
val telCompositeKeyForDeviceName = StreamConstants.teleStore.get.get(compositeKeyForDeviceName)
val telCompositeKeyForNumberAndDeviceName = StreamConstants.teleStore.get.get(compositeKeyForNumberAndDeviceName)
if(null !=telCompositeKeyForNumber ){
if(telCompositeKeyForNumber.dateCreated.toLong < telValue.get.dateCreated.toLong){
StreamConstants.teleStore.get.put(compositeKeyForNumber,telValue.get)
}
}else {
StreamConstants.teleStore.get.put(compositeKeyForNumber,telValue.get)
}
if(null != telCompositeKeyForDeviceName){
if(telCompositeKeyForDeviceName.dateCreated.toLong < telValue.get.dateCreated.toLong){
StreamConstants.teleStore.get.put(compositeKeyForDeviceName,telValue.get)
}
}else {
StreamConstants.teleStore.get.put(compositeKeyForDeviceName,telValue.get)
}
if(null != telCompositeKeyForNumberAndDeviceName){
if(telCompositeKeyForNumberAndDeviceName.dateCreated.toLong < telValue.get.dateCreated.toLong){
StreamConstants.teleStore.get.put(compositeKeyForNumberAndDeviceName,telValue.get)
}
}else {
StreamConstants.teleStore.get.put(compositeKeyForNumberAndDeviceName,telValue.get)
}
context.forward(telKey.get, telValue.get.toJson.toString())
context.forward(compositeKeyForNumber, telValue.get.toJson.toString())
context.forward(compositeKeyForDeviceName, telValue.get.toJson.toString())
context.forward(compositeKeyForNumberAndDeviceName, telValue.get.toJson.toString())
}else {
StreamConstants.teleStore.get.put(telKey.get,null)
context.forward(telKey.get,null)
}
}
}
使用消息值中的数据创建自定义键,而不是使用主题中的直接键。假设我删除了本地 Global store 。从 compact topic 恢复这个 store 时会发生什么?
解决方案
在恢复时,来自变更日志主题的数据按原样放入全局存储中,跳过任何自定义处理器逻辑。这是一个已知问题:https ://issues.apache.org/jira/browse/KAFKA-4963
推荐阅读
- c - Atmega328P 中的奇怪延迟行为
- sql-server - SQL SERVER 事务日志恢复
- python - 运行包时制作 setup.py 文件 ModuleNotFoundError
- webgl - 无法让 gl.ALPHA 通道在 WebGL 1 中工作
- javascript - 有没有办法从 Window 对象中完全封装 React
- excel - 如何使用 VBA 在 Excel 中仅对可见行进行序列编号?
- django - How to query Django LogEntry by content_type name?
- c - How to intentionally create .dep files?
- laravel - How do I customize laravel fatal error exception?
- java - excluding module/transitive dependencies from Gradle Build