首页 > 解决方案 > 无法使用火花批处理在 kafka 中推送消息

问题描述

 sparkSession = SparkSession.builder().appName("TDAP DATA SHARING 
     API").config("spark.ui.port", "6787")
                .getOrCreate();
        Dataset<Row> parquetFileDF = sparkSession.read().parquet(
                "/test/cob_date=2019-04-18/");
        parquetFileDF.createOrReplaceTempView("ParquetTable");
        Dataset<Row> parkSQL = sparkSession.sql("select * from ParquetTable where m_date<>'NULL' limit 10");

        System.out.println("printing the scema of parkSQL");
        parkSQL.printSchema();
        System.out.println("*record count in dataframe*" + parkSQL.count());

        System.out.println("printing the scema of df");
        Dataset<Row> df = parkSQL.selectExpr("struct(*) AS value");
        Dataset<Row> df2 = df.selectExpr("CAST(value AS STRING)");
        df2.printSchema();
        df2.show();
        System.out.println("*record count in dataframe*" + df2.count());


        df2.write().format("kafka").option("kafka.bootstrap.servers", "10.91.134.19:9093")
                .option("kafka.key.deserializer", "org.apache.kafka.common.serialization.StringDeserialize")
                .option("kafka.value.deserializer", "org.apache.kafka.common.serialization.StringDeserialize")
                .option("kafka.ssl.keystore.location", "/hivestage/tmmart/tmmart/sparkJar/kafka.keystore.jks")
                .option("kafka.ssl.keystore.password", "x@123")
                .option("kafka.ssl.key.password", "x@123")
                .option("kafka.ssl.truststore.location", "/x/tmmart/tmmart/sparkJar/kafka.truststore.jks")
                .option("kafka.ssl.truststore.password", "x@UAT").option("kafka.ssl.keystore.type", "JKS")
                .option("kafka.ssl.protocol", "SSL").option("kafka.sasl.mechanism", "SASL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("kafka.request.timeout.ms", "300000").option("topic", "dbs_itt_tdap_tswb_load_perf").save();

我无法在 kafka 主题中推送任何消息。它给出了类似的错误

org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败。

谁能建议我,我在哪里做错了?

标签: apache-kafkaapache-spark-sqlkafka-consumer-api

解决方案


推荐阅读