java - Spark 结构化流中的提交消息
问题描述
我正在使用 spark sturctured streaming (2.3) 和 kafka 2.4 版本。
我想知道如何使用ASync and Sync
提交偏移属性。
如果我设置enable.auto.commit
为真,是Sync or ASync
吗?
如何在 spark 结构化流中定义回调?或者我如何Sync or ASync
在 Spark 结构化流中使用?
提前致谢
我的代码
package sparkProject;
import java.io.StringReader;
import java.util.*;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
public class XMLSparkStreamEntry {
static StructType structType = new StructType();
static {
structType = structType.add("FirstName", DataTypes.StringType, false);
structType = structType.add("LastName", DataTypes.StringType, false);
structType = structType.add("Title", DataTypes.StringType, false);
structType = structType.add("ID", DataTypes.StringType, false);
structType = structType.add("Division", DataTypes.StringType, false);
structType = structType.add("Supervisor", DataTypes.StringType, false);
}
static ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
public static void main(String[] args) throws StreamingQueryException {
SparkConf conf = new SparkConf();
SparkSession spark = SparkSession.builder().config(conf).appName("Spark Program").master("local[*]")
.getOrCreate();
Dataset<Row> ds1 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "Kafkademo").load();
Dataset<Row> ss = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
Dataset<Row> finalOP = ss.flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Row> call(Row t) throws Exception {
JAXBContext jaxbContext = JAXBContext.newInstance(FileWrapper.class);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
StringReader reader = new StringReader(t.getAs("value"));
FileWrapper person = (FileWrapper) unmarshaller.unmarshal(reader);
List<Employee> emp = new ArrayList<Employee>(person.getEmployees());
List<Row> rows = new ArrayList<Row>();
for (Employee e : emp) {
rows.add(RowFactory.create(e.getFirstname(), e.getLastname(), e.getTitle(), e.getId(),
e.getDivision(), e.getSupervisor()));
}
return rows.iterator();
}
}, encoder);
Dataset<Row> wordCounts = finalOP.groupBy("firstname").count();
StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();
System.out.println("SHOW SCHEMA");
query.awaitTermination();
}
}
我可以请任何人检查,我在哪里以及如何在上面的代码中实现异步和同步偏移提交?
提前致谢..!
解决方案
请阅读https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read 这是一个很好的来源,虽然在线。
简而言之:
结构化流式处理忽略 Apache Kafka 中的偏移量提交。相反,它依赖于驱动程序端自己的偏移量管理,该驱动程序负责将偏移量分配给执行器,并在处理轮次(epoch 或 micro-batch)结束时对它们进行检查点。
Batck Spark 结构化流和 KAFKA 集成再次以不同的方式工作。
推荐阅读
- lua - 在 Lua 中解码 7 位 GSM
- javascript - 使用js cloneNode复制模板和修改孩子
- android - 两个视图之间从左,右和中心的相等空间android
- python - plotly:如何在窗口中制作独立的情节?
- haskell - haskell中自然顺序的函数组合
- swift - 访问 Firebase Analytics 数据
- php - “此页面无法正常工作 xxxxx.com 目前无法处理此请求。HTTP 错误 500”
- javascript - 通过nodejs连接ssh
- android - Android Glide 错误:找不到符号方法 crossFade()
- c - 如何创建二维数组?