hadoop - 获取 Flume+Kafka 生成的更多信息数据集
问题描述
Kafka,我已经将我的 Flume Job 配置为使用 Kafka 作为源我在我的文件夹中获取数据,这没有提供信息是否需要在我的配置 Flume FLume JOb Config 中进行更改
#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.useFlumeEventFormat = false
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers =${BOOTSTRAP_SERVERS}
MY_AGENT.sources.my-source.kafka.topics = my-topic
MY_AGENT.sources.my-source.kafka.consumer.group.id = my-topic_grp
MY_AGENT.sources.my-source.kafka.consumer.client.id = my-topic_clnt
MY_AGENT.sources.my-source.kafka.compressed.topics = my-topic
MY_AGENT.sources.my-source.kafka.auto.commit.enable = false
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.max.partition.fetch.bytes=704857
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=latest
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
#Sink
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.kerberosPrincipal =${user}
MY_AGENT.sinks.my-sink.hdfs.kerberosKeytab =${keytab}
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://nameservice1/my_hdfs/my_table1/timestamp=%Y%m%d
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.hdfs.schema.registry.urr=l = ${SCHEMA_URL}
数据的O/P
0000000: 53 45 51 06 21 6f 72 67 2e 61 70 61 63 68 65 2e SEQ.!org.apache.
0000010: 68 61 64 6f 6f 70 2e 69 6f 2e 4c 6f 6e 67 57 72 hadoop.io.LongWr
0000020: 69 74 61 62 6c 65 22 6f 72 67 2e 61 70 61 63 68 itable"org.apach
0000030: 65 2e 68 61 64 6f 6f 70 2e 69 6f 2e 42 79 74 65 e.hadoop.io.Byte
0000040: 73 57 72 69 74 61 62 6c 65 00 00 00 00 00 00 85 sWritable.......
0000050: a6 6f 46 0c f4 16 33 a6 eb 43 c2 21 5c 1b 4f 00 .oF...3..C.!\.O.
0000060: 00 00 18 00 00 00 08 00 00 01 4d c6 1b 01 1f 00 ..........M.....
0000070: 00 00 0c 48 65 6c 6c 6f 20 48 44 46 53 21 0d ...Hello HDFS!.
我得到的这种输出我期待 json 类型的结果,我的配置水槽文件中是否需要更改某些内容
解决方案
推荐阅读
- typescript - 使用 TypeScript Compiler API 在函数前添加注释
- java - 以下哪些修饰符可有效应用于不是嵌套类的类?
- typescript - “FluentRules”类型上不存在属性“on”
| FluentEnsure | FluentRuleCustomizer '。 - sql-server - SQL Server sp_send_dbmail 未将电子邮件中的正文发送到短信
- javascript - 引导日期/选择器未显示
- vue.js - Vue-Analytics 不起作用(注意:我需要根据主机名更改 id)
- javascript - math.floor 和 math.radom 是一个索引
- scala - 根据列表中定义的列过滤数据框
- android - 如何在 android 中使用 MFCC TarsosDSP 和麦克风
- python - Python ncurses addstr 预期字节或 str,得到 int