首页 > 技术文章 > flume消息处理的监控信息使用

rocly 2018-01-03 10:46 原文

Flume现在使用越来越多,在使用过程中难免发现性能瓶颈或者消息丢失的问题。在遇到这些问题的时候,第一想到的是通过java自带命令去分析问题和使用一些日志去定位问题。

Flume在处理消息时自带了很多counter,并可以以JMX、Ganglia、JSON等方式发布出来,在需要的时候,在启动脚本中增加该配置项即可使用:

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

增加启动后,可输入http://188.1.186.XXX:34545/metrics 得到监控信息的json数据

如果在linux上运行,直接执行 curl -XGET '188.1.186.XXX:34545/metrics'

得到消息,可以通过放到eclipse中,命名一个json文件,ctrl+shift+f 格式化下:

{
    "SINK.k1": {
        "ConnectionCreatedCount": "1",
        "ConnectionClosedCount": "0",
        "Type": "SINK",
        "BatchCompleteCount": "0",
        "BatchEmptyCount": "0",
        "EventDrainAttemptCount": "7908340",
        "StartTime": "1514878638909",
        "EventDrainSuccessCount": "7657343",
        "BatchUnderflowCount": "250997",
        "StopTime": "0",
        "ConnectionFailedCount": "0"
    },
    "CHANNEL.c1": {
        "ChannelCapacity": "1000000",
        "ChannelFillPercentage": "0.0",
        "Type": "CHANNEL",
        "ChannelSize": "0",
        "EventTakeSuccessCount": "7908340",
        "EventTakeAttemptCount": "7908466",
        "StartTime": "1514878638906",
        "EventPutAttemptCount": "7908340",
        "EventPutSuccessCount": "7908340",
        "StopTime": "0"
    },
    "SOURCE.r1": {
        "KafkaEventGetTimer": "6468875",
        "AppendBatchAcceptedCount": "0",
        "EventAcceptedCount": "7908340",
        "AppendReceivedCount": "0",
        "StartTime": "1514878643588",
        "AppendBatchReceivedCount": "0",
        "KafkaCommitTimer": "156254",
        "EventReceivedCount": "7908340",
        "Type": "SOURCE",
        "AppendAcceptedCount": "0",
        "OpenConnectionCount": "0",
        "KafkaEmptyCount": "0",
        "StopTime": "0"
    }
}

通过对source、channel、sink中的指标了解系统的处理瓶颈。

如果自己开发的插件,同样可以使用这些counter来完成统计信息的输出,如:

public class ElasticSearchSink extends AbstractSink implements Configurable {

    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class);
    private BulkProcessor bulkProcessor;private SinkCounter sinkCounter;

    @Override
    public void configure(Context context) {
        ...
        buildIndexBuilder(context);
        buildSerializer(context);
        if (sinkCounter == null) {
             sinkCounter = new SinkCounter(getName());
        }
        bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client);
    }
    @Override
    public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction txn = channel.getTransaction();
        txn.begin();
        try {
            Event event = channel.take();
            if (event != null) {
                String body = new String(event.getBody(), Charsets.UTF_8);
                sinkCounter.incrementEventDrainAttemptCount();
                if (!Strings.isNullOrEmpty(body)) {
                    String index = indexBuilder.getIndex(event);
                    String type = indexBuilder.getType(event);
                    String id = indexBuilder.getId(event);
                    XContentBuilder xContentBuilder = serializer.serialize(event);
                    if(index!=null && xContentBuilder != null) {
                        if (!StringUtil.isNullOrEmpty(id)) {
                            bulkProcessor.add(new IndexRequest(index, type, id)
                                    .source(xContentBuilder));
                            sinkCounter.incrementEventDrainSuccessCount();
...

  @Override
  public synchronized void start() {
    sinkCounter.start();
    sinkCounter.incrementConnectionCreatedCount();
    super.start();
  }

...

 

总结:

1.遇到flume性能问题,在启动脚本增加 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 开启监控日志

2.通过http://<hostname>:<port>/metrics获取信息

3.自定义开发插件时,可以使用flume已有counter记录统计信息

备注:更多内容,参考官方文档:

http://flume.apache.org/FlumeUserGuide.html#monitoring

 

 

-----------------------------------------

 

有问题不怕,解决思路很重要!

 

推荐阅读