java - Flink JDBC 连接多个租户
问题描述
我有以下 2 节课。主要类是Alocalculation,它有一个包含某些值的变量finalOutput 。DataSet<Tuple4<String, String, Double, String>> finalOutput
. 尝试将该值插入到 postgresql 中。变量 connectionValues 具有 json,它具有各种参数,如用户名、密码、驱动程序名,这些参数是从名为 outermap 的哈希映射中接收的。从 Writealos 我可以发送试图插入到 Aloscalculation 中的行对象。但我无法发送变量连接值来设置驱动程序名称、密码、url 和用户名。请建议我怎么做。org_metric_result、org_metric_orgid 等值会相应计算。
我现在用一种新的方式编辑了代码来解决这个问题。但是现在新问题出现在 Writealos 方法中,如果匹配没有发生,我会返回一个空对象。由于在数据库插入时我得到一个异常说空值正在形成并且无法插入。请问谁能解决这个错误?
public class Writealos implements MapFunction<Tuple4<String, String, Double, String>, Row>
{
@Autowired
private Tenantresource outermap;
public Row map(Tuple4<String, String, Double, String> arg0)throws Exception
{
if(arg0.f0.equals(currentKey))
{
Row obj = new Row(7);
String string = RandomStringUtils.randomAlphanumeric(32);
obj.setField(0, string);
obj.setField(1, alos);
obj.setField(2, org_metric_result.toString());
obj.setField(3, Start_Date.toString());
obj.setField(4, End_Date.toString());
obj.setField(5, Execution_Date.toString());
obj.setField(6, org_metric_orgid);
}
return obj;
}
}
public class Aloscalculation
{
@Autowired
private static Tenantresource outermap;
public static void calculateAlos(Fhirresource fhir_resource ) throws Exception
{
String query = "insert into reports (org_metric_id,org_metric_topic, org_metric_result, org_metric_from, org_metric_to, org_metric_executed_on, org_metric_orgid) values (?,?,cast(? as json),cast(? as timestamp),cast(? as timestamp),cast(? as timestamp),?)";
for(String currentKey : outermap.tenantresourcereturn().keySet())
{
JSONObject connection =new JSONObject( outermap.tenantresourcereturn().get(currentKey));
System.out.println(currentKey);
String url=connection.getString("jdbc_url")+":"+connection.getString("port")+"/"+connection.getString("db_name");
System.out.println(url);
finaloutput.map(new Writealos(fhir_resource,currentKey))
.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername()
.setDBUrl()
.setUsername()
.setPassword()
.setQuery(query)
.finish());
}
}
解决方案
你是怎么曝光 public Row map(Tuple4<String, String, Double, String> arg0)
的,是通过服务吗。Spring 提供了一个注解 @ ResponseBody
。您需要有一个匹配的 POJO,spring 将负责转换。其次,您为什么不打算使用任何 ORM 工具(Hibernate、Ibatis 等),它可以简化您的 CRUD 操作。
推荐阅读
- jquery - 使用 jquery 从克隆的输入字段上传多个文件
- ios - Tableview numberOfRowsInSection 返回重复的.count 数字?
- python - 使用外部 CMD 启动程序,检测何时关闭
- python - 将函数应用于 NumPy 数组的唯一值
- api - 端点应该设计多深的资源访问权限
- c# - 如何根据角色(ASP.NET 身份)使用户具有不同的属性
- python - 如何使用 Pandas 向 Dataframe 的每一行添加值?
- python - Spark:如何在 pyspark 中将字节字符串写入 hdfs hadoop 以进行 spark-xml 转换?
- javascript - jquery - 获取所有参数而不是一个
- javascript - Javascript中基于Tensorflow或数组的按位或/与