apache-flink - 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它没有正确写入文件
问题描述
我正在从 Kafka 读取数据并尝试以 ORC 格式将其写入 HDFS 文件系统。我使用了他们官方网站上的以下链接参考。但我可以看到 Flink 为所有数据写入完全相同的内容并制作了这么多文件并且所有文件都可以 103KB
请在下面找到我的代码。
object BeaconBatchIngest extends StreamingBase {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
def getTopicConfig(configs: List[Config]): Map[String, String] = (for (config: Config <- configs) yield (config.getString("sourceTopic"), config.getString("destinationTopic"))).toMap
def setKafkaConfig():Unit ={
val kafkaParams = new Properties()
kafkaParams.setProperty("bootstrap.servers","")
kafkaParams.setProperty("zookeeper.connect","")
kafkaParams.setProperty("group.id", DEFAULT_KAFKA_GROUP_ID)
kafkaParams.setProperty("auto.offset.reset", "latest")
val kafka_consumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("sourceTopics", new SimpleStringSchema(),kafkaParams)
kafka_consumer.setStartFromLatest()
val stream: DataStream[DataParse] = env.addSource(kafka_consumer).map(new temp)
val schema: String = "struct<_col0:string,_col1:bigint,_col2:string,_col3:string,_col4:string>"
val writerProperties = new Properties()
writerProperties.setProperty("orc.compress", "ZLIB")
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema),writerProperties,new org.apache.hadoop.conf.Configuration);
val sink: StreamingFileSink[DataParse] = StreamingFileSink
.forBulkFormat(new Path("hdfs://warehousestore/hive/warehouse/metrics_test.db/upp_raw_prod/hour=1/"), writerFactory)
.build()
stream.addSink(sink)
}
def main(args: Array[String]): Unit = {
setKafkaConfig()
env.enableCheckpointing(5000)
env.execute("Kafka_Flink_HIVE")
}
}
class temp extends MapFunction[String,DataParse]{
override def map(record: String): DataParse = {
new DataParse(record)
}
}
class DataParse(data : String){
val parsedJason = parse(data)
val timestamp = compact(render(parsedJason \ "timestamp")).replaceAll("\"", "").toLong
val event = compact(render(parsedJason \ "event")).replaceAll("\"", "")
val source_id = compact(render(parsedJason \ "source_id")).replaceAll("\"", "")
val app = compact(render(parsedJason \ "app")).replaceAll("\"", "")
val json = data
}
class PersonVectorizer(schema: String) extends Vectorizer[DataParse](schema) {
override def vectorize(element: DataParse, batch: VectorizedRowBatch): Unit = {
val eventColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
val timeColVector = batch.cols(1).asInstanceOf[LongColumnVector]
val sourceIdColVector = batch.cols(2).asInstanceOf[BytesColumnVector]
val appColVector = batch.cols(3).asInstanceOf[BytesColumnVector]
val jsonColVector = batch.cols(4).asInstanceOf[BytesColumnVector]
timeColVector.vector(batch.size + 1) = element.timestamp
eventColVector.setVal(batch.size + 1, element.event.getBytes(StandardCharsets.UTF_8))
sourceIdColVector.setVal(batch.size + 1, element.source_id.getBytes(StandardCharsets.UTF_8))
appColVector.setVal(batch.size + 1, element.app.getBytes(StandardCharsets.UTF_8))
jsonColVector.setVal(batch.size + 1, element.json.getBytes(StandardCharsets.UTF_8))
}
}
解决方案
对于批量格式(例如 ORC),StreamingFileSink
每个检查点都会滚动到新文件。如果减少检查点间隔(当前为 5 秒),它就不会写入这么多文件。
推荐阅读
- c# - 使用 SqlCommand 执行时可能导致链接服务器查询失败的原因是什么?
- php - 我有一个视图无法通过我的控制器连接到我的登录模型
- json - 请求正文验证
- python-3.x - 协助添加多个骰子
- sql - 在oracle sql中将4229,4转换为4229.4000000
- kubernetes - 简化 Kubernetes RBAC 定义的创建
- linux - 更改文件linux中的日期格式
- ionic3 - 我想给扫描号码的超链接,但我不知道如何给
- java - 我已经尝试了与此异常相关的所有可用答案,但无法解决。甚至 assembleDebug 也给出了这个输出。我是安卓新手
- php - 如何回显videojs的视频元素