apache-flink - RichParallelSourceFunction 中的水印
问题描述
我正在实现一个 SourceFunction,它从数据库中读取数据。如果停止或崩溃(即保存点和检查点),并且数据只处理一次,该作业应该能够恢复。
到目前为止我所拥有的:
@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
如何确保只获取尚未处理的数据库行?我假设该ctx
变量将包含有关当前水印的一些信息,以便我可以将查询更改为:
select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark
但它对我没有任何相关的方法。任何想法如何解决这个问题将不胜感激
解决方案
您必须实现CheckpointedFunction以便您可以自己管理检查点。该接口的文档非常全面,但如果您需要一个示例,我建议您查看一个示例。
本质上,您的函数必须实现CheckpointedFunction#snapshotState
以使用 Flink 的托管状态存储您需要的状态,然后在执行恢复时,它将读取相同的状态CheckpointedFunction#initializeState
。
推荐阅读
- ruby-on-rails - facebook auth 自我召回后的sign_in_and_redirect
- python - 如何使用 Python 3 正确操作内存中的 tar 文件?
- c - C++ Libcurl + Cloudflare
- javascript - 在网页上,如果用户根本没有点击过任何页面元素,那么 body 元素是否首先获得任何键盘事件?
- javascript - 尝试添加到我的传单地图时出错,它是 LayerGroup
- go - Goroutines:在哪里关闭
- xmgrace - 使用“按功能”添加垂直线在 XMGrace 中不起作用
- python - 有没有办法在大型机上使用 python 脚本发送 ISPF 命令并获取作业统计信息?
- reactjs - ReactJS - 为什么当我在组件中的选项卡之间切换时它会滞后?
- reactjs - useState 在哪里存储它的值