首页 > 解决方案 > 获取 Flume+Kafka 生成的更多信息数据集

问题描述

Kafka,我已经将我的 Flume Job 配置为使用 Kafka 作为源我在我的文件夹中获取数据,这没有提供信息是否需要在我的配置 Flume FLume JOb Config 中进行更改

#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.useFlumeEventFormat = false
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers =${BOOTSTRAP_SERVERS}
MY_AGENT.sources.my-source.kafka.topics = my-topic
MY_AGENT.sources.my-source.kafka.consumer.group.id = my-topic_grp
MY_AGENT.sources.my-source.kafka.consumer.client.id = my-topic_clnt
MY_AGENT.sources.my-source.kafka.compressed.topics = my-topic
MY_AGENT.sources.my-source.kafka.auto.commit.enable = false
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.max.partition.fetch.bytes=704857
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=latest

#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false

#Sink
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.kerberosPrincipal =${user}
MY_AGENT.sinks.my-sink.hdfs.kerberosKeytab =${keytab}
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://nameservice1/my_hdfs/my_table1/timestamp=%Y%m%d
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000

MY_AGENT.sinks.my-sink.hdfs.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.hdfs.schema.registry.urr=l = ${SCHEMA_URL}

数据的O/P

0000000: 53 45 51 06 21 6f 72 67 2e 61 70 61 63 68 65 2e SEQ.!org.apache.
0000010: 68 61 64 6f 6f 70 2e 69 6f 2e 4c 6f 6e 67 57 72 hadoop.io.LongWr
0000020: 69 74 61 62 6c 65 22 6f 72 67 2e 61 70 61 63 68 itable"org.apach
0000030: 65 2e 68 61 64 6f 6f 70 2e 69 6f 2e 42 79 74 65 e.hadoop.io.Byte
0000040: 73 57 72 69 74 61 62 6c 65 00 00 00 00 00 00 85 sWritable.......
0000050: a6 6f 46 0c f4 16 33 a6 eb 43 c2 21 5c 1b 4f 00 .oF...3..C.!\.O.
0000060: 00 00 18 00 00 00 08 00 00 01 4d c6 1b 01 1f 00 ..........M.....
0000070: 00 00 0c 48 65 6c 6c 6f 20 48 44 46 53 21 0d    ...Hello HDFS!.

我得到的这种输出我期待 json 类型的结果,我的配置水槽文件中是否需要更改某些内容

标签: hadoopapache-kafkaavroflume

解决方案


推荐阅读