首页 > 解决方案 > Flink JDBC 连接多个租户

问题描述

我有以下 2 节课。主要类是Alocalculation,它有一个包含某些值的变量finalOutput 。DataSet<Tuple4<String, String, Double, String>> finalOutput. 尝试将该值插入到 postgresql 中。变量 connectionValues 具有 json,它具有各种参数,如用户名、密码、驱动程序名,这些参数是从名为 outermap 的哈希映射中接收的。从 Writealos 我可以发送试图插入到 Aloscalculation 中的行对象。但我无法发送变量连接值来设置驱动程序名称、密码、url 和用户名。请建议我怎么做。org_metric_result、org_metric_orgid 等值会相应计算。

我现在用一种新的方式编辑了代码来解决这个问题。但是现在新问题出现在 Writealos 方法中,如果匹配没有发生,我会返回一个空对象。由于在数据库插入时我得到一个异常说空值正在形成并且无法插入。请问谁能解决这个错误?

    public class Writealos implements MapFunction<Tuple4<String, String, Double, String>, Row> 
    {
        @Autowired
        private Tenantresource outermap;
        public Row map(Tuple4<String, String, Double, String> arg0)throws Exception
        {
            if(arg0.f0.equals(currentKey))
            {
               Row obj = new Row(7);
               String string = RandomStringUtils.randomAlphanumeric(32);
               obj.setField(0, string);
               obj.setField(1, alos);
               obj.setField(2, org_metric_result.toString());
               obj.setField(3, Start_Date.toString());
               obj.setField(4, End_Date.toString());
               obj.setField(5, Execution_Date.toString());
               obj.setField(6, org_metric_orgid);
         }
            return obj; 
        }
    }

    public class Aloscalculation
    {
        @Autowired
    private static Tenantresource outermap;
        public static void calculateAlos(Fhirresource fhir_resource ) throws Exception 
        {
            String query = "insert into reports (org_metric_id,org_metric_topic, org_metric_result, org_metric_from, org_metric_to, org_metric_executed_on, org_metric_orgid) values (?,?,cast(? as json),cast(? as timestamp),cast(? as timestamp),cast(? as timestamp),?)";
         for(String currentKey : outermap.tenantresourcereturn().keySet()) 
         { 
             JSONObject connection =new JSONObject( outermap.tenantresourcereturn().get(currentKey));
             System.out.println(currentKey);
             String url=connection.getString("jdbc_url")+":"+connection.getString("port")+"/"+connection.getString("db_name");
             System.out.println(url);
            finaloutput.map(new Writealos(fhir_resource,currentKey))
                .output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername()
                .setDBUrl()
                .setUsername()
                .setPassword()
                .setQuery(query)
                .finish());
        }
    }

标签: javahashmapapache-flink

解决方案


你是怎么曝光 public Row map(Tuple4<String, String, Double, String> arg0)的,是通过服务吗。Spring 提供了一个注解 @ ResponseBody。您需要有一个匹配的 POJO,spring 将负责转换。其次,您为什么不打算使用任何 ORM 工具(Hibernate、Ibatis 等),它可以简化您的 CRUD 操作。

Spring @ResponseBody 注解在这个 RESTful 应用示例中是如何工作的?


推荐阅读