首页 > 解决方案 > Flink:如何在数据库中保存行列表

问题描述

现在我正在从文件中读取行并使用以下代码保存在数据库中:

String strQuery = "INSERT INTO public.alarm (id, name, marks) VALUES (?, ?, ?)";
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername("org.postgresql.Driver")
 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
 .setQuery(strQuery)
 .setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR, Types.INTEGER}) //set the types
 .finish();
DataStream<Row> rows = FilterStream
            .map((tuple)-> {
               Row row = new Row(3);                 
               row.setField(0, tuple.f0);            
               row.setField(1, tuple.f1);             
               row.setField(2, tuple.f2);   
               return row;
            });
rows.writeUsingOutputFormat(jdbcOutput);
env.execute();
}
}

以上工作正常,它从文件中一一挑选行并将其保存在数据库中。

例如:

如果文件包含:

1, mark, 20

然后数据库条目将如下所示:

id   name   marks
------------------
1    mark   20

现在要求是每一行,我必须创建 2 个不同的行,它应该如下所示:

例如:

如果文件包含:

1, mark, 20

那么数据库条目应如下所示:

id   name   marks
------------------
1    mark-1   20
1    mark-2   20

现在我应该返回 List 而不是 row 并且数据流变量应该看起来像DataStream<List<Row>> rows.

为了实现这一点,我应该在 JDBCOutputFormat 变量中进行哪些更改?

标签: apache-flinkflink-streaming

解决方案


推荐阅读