apache-flink - Flink:如何在数据库中保存行列表
问题描述
现在我正在从文件中读取行并使用以下代码保存在数据库中:
String strQuery = "INSERT INTO public.alarm (id, name, marks) VALUES (?, ?, ?)";
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
.setQuery(strQuery)
.setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR, Types.INTEGER}) //set the types
.finish();
DataStream<Row> rows = FilterStream
.map((tuple)-> {
Row row = new Row(3);
row.setField(0, tuple.f0);
row.setField(1, tuple.f1);
row.setField(2, tuple.f2);
return row;
});
rows.writeUsingOutputFormat(jdbcOutput);
env.execute();
}
}
以上工作正常,它从文件中一一挑选行并将其保存在数据库中。
例如:
如果文件包含:
1, mark, 20
然后数据库条目将如下所示:
id name marks
------------------
1 mark 20
现在要求是每一行,我必须创建 2 个不同的行,它应该如下所示:
例如:
如果文件包含:
1, mark, 20
那么数据库条目应如下所示:
id name marks
------------------
1 mark-1 20
1 mark-2 20
现在我应该返回 List 而不是 row 并且数据流变量应该看起来像DataStream<List<Row>> rows
.
为了实现这一点,我应该在 JDBCOutputFormat 变量中进行哪些更改?
解决方案
推荐阅读
- java - Java EE JSON-P 流 API Parsin 数据对象
- acumatica - 为什么我的网格不能保存多于一行?
- sql-server - 带有列表搜索键的 Sql FREETEXTTABLE
- docker - Jhipster - 生产构建失败
- python - 通过 dict.get() 调用函数
- javascript - 从谷歌表格数据创建事件时间线
- scala - 如何在 apache spark 2.3 中覆盖分区,同时仍使用 insertInto 方法写入镶木地板
- r - R:data.table 在使用连接分配时将值分配给错误的表
- django - Django modelForm 没有将文件保存到数据库
- java - 我如何创建一个使用摇摆计数的计时器