首页 > 技术文章 > flume面试题

JoshWill 2020-06-09 10:19 原文

1.介绍一下flume的channel

channel被设计为event中转临时缓冲区,存储source收集并且没有被sink读取的event,平衡source收集和sink读取的速度,可以将其视为flume内部的消息队列。channel线程安全并且具有事务性,支持source写失败写,sink读失败重复读的操作。常见的类型包括Memory Channel,File Channel,Kafka Channel

2.Memory Channel与File Channel的优缺点

Memory Channel读写速度快,但是存储数据量小。Flume进程挂掉、服务器停机或者重启都会导致数据丢失。在资源充足,不关心数据丢失的场景下可以使用。
File Channel存储容量大,无数据丢失的风险。读写速度慢,但可以通过配置多磁盘文件路径,通过磁盘并行写入提高File Channel性能。Flume将Event顺序写入到File Channel文件的末尾,可以通过配置maxFileSize参数配置数据文件大小,当文件大小达到这个值,创建新的文件,并将该文件设置为只读,直到Flume把该文件读取完成,删除该文件。

3.Kafka Channel的优点有哪些

Memory Channel有很大程度丢失数据的风险,File Channel虽然无数据丢失风险,但如果缓存下来的消息来没来得及写入Sink,Agent就出现故障,File Channel中的消息一样不能被继续使用。Kafka的容错能力解决了这一点。
Flume一旦配置了Kafka为Channel,则不再需要配置Sink组件,减少了Flume启动的进程数,降低了服务器内存、磁盘等资源的使用率。

4.Flume的拦截器是什么

Source在将Event写入到Channel之前可以使用拦截器对Event进行各种形式的处理,Source和Channel之间可以设置多个拦截器,不同的拦截器可以设置不同的规则对Event进行处理

5.Flume的选择器是什么

Source发送的Event通过Channel选择器来选择以哪种方式写入到Channel中,Flume提供了三种类型的选择器,复制选择器、复用选择器以及自定义选择器
1)复制选择器:一个Source以复制的方式将一个Event写入到多个Channel中,不同的Sink可以从不同的Channel中获取到相同的Event。
如果Source没有指定Channel选择器,则该SOurce使用复制Channel选择器,复制选择器有一个配置参数optional,该参数指定的所有channel是可选的,当时间写入到这些channel时有失败发生,则忽略这些失败,否则抛出异常,要求Source重试。

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

2)复用选择器:需要和拦截器配合使用,根据Event的头信息的不同写入到不同的Channel中。

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.optional.US = c4
a1.sources.r1.selector.default = c4

3)自定义选择器:自定义选择器需要实现ChannelSelector接口,或者继承AbstractChannelSelector类。

6.了解Flume的负载均衡和故障转移吗

设置sink组,同一个sink组内有多个sink,不同sink之间可以配置成负载均衡或故障转移

7.Flume的事务机制

flume基于事务传输event(批量传输),使用两个独立的事务分别处理source到channel和channel到sink,失败时会将所有数据回滚进行重试。该事务遵循“最少一次”语义,因此数据不会丢失,但有可能重复。
source-channel之间的重复可以靠TailDir Source自带的断点续传功能解决
put事务:
1)doPut:将批数据先写入到临时缓冲区putLIst(putList就是一个临时的缓冲区)
2)doCommit:检查channel内存队列是否足够合并
3)doRollback:channel内存队列空间不足,回滚,等待内存通道的容量满足合并
channel-sink之间的重复,可以延长等待时间,或者设置UUID拦截器,然后再redis里维护一个布隆表来使下游实时应用去重。
take事务:
1)doTake:将数据取到临时缓冲区takeList
2)将数据发送到下一个节点
3)doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
4)doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

TairDir Source配置

# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1

######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100

######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy

8.Flume参数调优

Source:
1)增加Source个数,可以增大Source读取数据的能力。
2)batchSize参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。
Channel:
1)使用File Channel时dataDirs配置多个不同盘下的目录可以提高性能
2)Capacity参数决定Channel可容纳最大的Event条数。transactionCapacty参数决定每次Source往Channel里面写的最大event条数和每次sink从channel里面读的最大event条数,transactionCapacty需要大于Source和Sink的batchSize参数
Sink:
1)适当增加Sink的个数可以增加Sink消费event的能力,但过多的sink会占用系统资源,造成不必要的浪费
2)batchSize参数决定Sink批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬运Event的性能。

推荐阅读