apache-kafka - 无法使用火花批处理在 kafka 中推送消息
问题描述
sparkSession = SparkSession.builder().appName("TDAP DATA SHARING
API").config("spark.ui.port", "6787")
.getOrCreate();
Dataset<Row> parquetFileDF = sparkSession.read().parquet(
"/test/cob_date=2019-04-18/");
parquetFileDF.createOrReplaceTempView("ParquetTable");
Dataset<Row> parkSQL = sparkSession.sql("select * from ParquetTable where m_date<>'NULL' limit 10");
System.out.println("printing the scema of parkSQL");
parkSQL.printSchema();
System.out.println("*record count in dataframe*" + parkSQL.count());
System.out.println("printing the scema of df");
Dataset<Row> df = parkSQL.selectExpr("struct(*) AS value");
Dataset<Row> df2 = df.selectExpr("CAST(value AS STRING)");
df2.printSchema();
df2.show();
System.out.println("*record count in dataframe*" + df2.count());
df2.write().format("kafka").option("kafka.bootstrap.servers", "10.91.134.19:9093")
.option("kafka.key.deserializer", "org.apache.kafka.common.serialization.StringDeserialize")
.option("kafka.value.deserializer", "org.apache.kafka.common.serialization.StringDeserialize")
.option("kafka.ssl.keystore.location", "/hivestage/tmmart/tmmart/sparkJar/kafka.keystore.jks")
.option("kafka.ssl.keystore.password", "x@123")
.option("kafka.ssl.key.password", "x@123")
.option("kafka.ssl.truststore.location", "/x/tmmart/tmmart/sparkJar/kafka.truststore.jks")
.option("kafka.ssl.truststore.password", "x@UAT").option("kafka.ssl.keystore.type", "JKS")
.option("kafka.ssl.protocol", "SSL").option("kafka.sasl.mechanism", "SASL")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.request.timeout.ms", "300000").option("topic", "dbs_itt_tdap_tswb_load_perf").save();
我无法在 kafka 主题中推送任何消息。它给出了类似的错误
org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败。
谁能建议我,我在哪里做错了?
解决方案
推荐阅读
- c - 为什么 malloc 和 sbrk 从单独的段返回地址?
- android - React Native - 在 Android 上切换选项卡时 ScrollableTabView 很慢
- git - 合并拉取请求后 Git 状态不显示差异
- php - 如何将购物车商品产品重定向到 Woocommerce 中的特定页面?
- azure - 如何关闭 Office365 Exchange Powershell 连接
- javascript - Javascript函数增量自动化问题
- spring - Spring 基础存储库实现不影响继承的接口
- django - django 数据库路由:自动故障转移?
- python - 如何在Python中一个接一个地播放音频文件序列?
- angular - TypeError:无法读取未定义的属性“收入”