首页 > 技术文章 > kubernetes集群EFK日志系统搭建

xuliang666 2019-07-29 18:49 原文

日志收集架构

Kubernetes 集群本身不提供日志收集的解决方案,一般来说有主要的3种方案来做日志收集:

  • 在节点上运行一个 agent 来收集日志
  • 在 Pod 中包含一个 sidecar 容器来收集应用日志
  • 直接在应用程序中将日志信息推送到采集后端

本文使用以下方案:

fluentd-->kafka-->logstash-->elasticsearch-->kibana

搭建 EFK 日志系统

elasticsearch安装使用集群外部环境

192.168.1.122  9200

 kafka安装使用集群外部环境

192.168.1.122 9092

kubernetes集群创建名称空间

kubectl create namespace logging

 

fluentd安装

默认没有kafka插件

使用官方镜像安装:fluent-gem install fluent-plugin-kafka后,commit一下

创建configmap,添加fluentd配置文件

[root@k8s-master ~]# cat fluentd-configmap.yaml 
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host 192.168.1.122
      port 9200
      logstash_format true
      request_timeout    30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>
[root@k8s-master ~]# 
View Code

 

 

 

注意:修改host 和port

部署fluentd

https://github.com/kubernetes/kubernetes/blob/master/cluster/addons/fluentd-elasticsearch/fluentd-es-ds.yaml

[root@k8s-master ~]# cat fluentd-configmap.yaml 
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host 192.168.1.122
      port 9200
      logstash_format true
      request_timeout    30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>
[root@k8s-master ~]# cat fluentd-daemonset.yaml 
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    version: v2.0.4
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
      version: v2.0.4
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
        version: v2.0.4
      # This annotation ensures that fluentd does not get evicted if the node
      # supports critical pod annotation based priority scheme.
      # Note that this does not guarantee admission on the nodes (#40573).
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        image: cnych/fluentd-elasticsearch:v2.0.4
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-config
[root@k8s-master ~]# 
View Code

 

  

创建节点标签

[root@k8s-master ~]# kubectl label nodes k8s-master beta.kubernetes.io/fluentd-ds-ready=true
[root@k8s-master ~]# kubectl label nodes k8s-node1 beta.kubernetes.io/fluentd-ds-ready=true

[root@k8s-master ~]# kubectl get nodes --show-labels
NAME         STATUS   ROLES    AGE   VERSION   LABELS
k8s-master   Ready    master   45d   v1.13.1   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/fluentd-ds-ready=true,beta.kubernetes.io/os=linux,kubernetes.io/hostname=k8s-master,node-role.kubernetes.io/master=
k8s-node1    Ready    <none>   45d   v1.13.1   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/fluentd-ds-ready=true,beta.kubernetes.io/os=linux,kubernetes.io/hostname=k8s-node1
[root@k8s-master ~]# 

 最后应用配置文件

kubectl apply  -f fluentd-daemonset.yaml 

查看pods情况

[root@k8s-master ~]# kubectl get pods -n logging
NAME               READY   STATUS    RESTARTS   AGE
fluentd-es-pjcpx   1/1     Running   0          72m
fluentd-es-x5bck   1/1     Running   0          72m
[root@k8s-master ~]#

 

最后就可以在kibana的dashboard上展示

如下过滤kubernetes集群入口ingressgateway日志信息

 

接入kafka收集日志

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id kafka
      @type kafka2
      @log_level info
      include_tag_key true
      # list of seed brokers
      brokers 192.168.1.122:9092
      use_event_time true
      # buffer settings
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
      # data type settings
      <format>
        @type json
      </format>
      # topic settings
      topic_key topic
      default_topic messages
      # producer settings
      required_acks -1
      compression_codec gzip
    </match>
View Code

 

修改output使用kafka插件:https://docs.fluentd.org/output/kafka

brokers 192.168.1.122:9092

topic_key topic

default_topic messages
# producer settings
required_acks -1
compression_codec gzip

重启fluentd 

[root@k8s-master ~]# kubectl get pods -n logging
NAME               READY   STATUS    RESTARTS   AGE
fluentd-es-mdsnz   1/1     Running   0          4d
fluentd-es-tc59t   1/1     Running   0          4d
[root@k8s-master ~]# kubectl logs -f fluentd-es-tc59t -n logging
2019-08-05 07:13:44 +0000 [info]: [kafka] brokers has been set: ["192.168.1.122:9092"]
2019-08-05 07:13:44 +0000 [warn]: parameter 'include_tag_key' in <match **>
  @id kafka
  @type kafka2
  @log_level "info"
  include_tag_key true
  brokers 192.168.1.122:9092
  use_event_time true
  topic_key "topic"
  default_topic "messages"
  required_acks -1
  compression_codec "gzip"
  <buffer>
    @type "file"
    path "/var/log/fluentd-buffers/kubernetes.system.buffer"
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 5s
    retry_forever 
    retry_max_interval 30
    chunk_limit_size 2M
    queue_limit_length 8
    overflow_action block
  </buffer>
  <format>
    @type "json"
  </format>
</match> is not used.
2019-08-05 07:13:44 +0000 [info]: [kafka] initialized kafka producer: fluentd

 启动成功后查看kafka里已经新生成了 messages的topic

在kafka上查看数据情况如下:

[root@dev-log-server kafka]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic messages
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"log":"2019-08-05 07:40:31.966 [INFO][65] client.go 587: Update: api.Update{KVPair:model.KVPair{Key:model.BlockAffinityKey{CIDR:net.IPNet{IPNet:net.IPNet{IP:net.IP{0xa, 0x51, 0x1, 0x0}, Mask:net.IPMask{0xff, 0xff, 0xff, 0x0}}}, Host:\"k8s-node1\"}, Value:(*model.BlockAffinity)(0xc4207e94c0), Revision:\"512369\", TTL:0}, UpdateType:0x2}\n","stream":"stdout","docker":{"container_id":"49f48c1613be45a92ea1fc06efc6c2928347edf1c86a43432871ab8c5cfac323"},"kubernetes":{"container_name":"calico-node","namespace_name":"kube-system","pod_name":"calico-node-2srnw","pod_id":"23241398-8e80-11e9-8cc4-000c29a74c85","labels":{"controller-revision-hash":"99dc95f6f","k8s-app":"calico-node","pod-template-generation":"1"},"host":"k8s-node1","master_url":"https://10.96.0.1:443/api","namespace_id":"aac82294-8e7f-11e9-8cc4-000c29a74c85"}}
{"log":"2019-08-05 07:40:29.278 [INFO][63] ipsets.go 254: Resyncing ipsets with dataplane. family=\"inet\"\n","stream":"stdout","docker":{"container_id":"49f48c1613be45a92ea1fc06efc6c2928347edf1c86a43432871ab8c5cfac323"},"kubernetes":{"container_name":"calico-node","namespace_name":"kube-system","pod_name":"calico-node-2srnw","pod_id":"23241398-8e80-11e9-8cc4-000c29a74c85","labels":{"controller-revision-hash":"99dc95f6f","k8s-app":"calico-node","pod-template-generation":"1"},"host":"k8s-node1","master_url":"https://10.96.0.1:443/api","namespace_id":"aac82294-8e7f-11e9-8cc4-000c29a74c85"}}
{"log":"2019-08-05 07:40:31.137 [INFO][65] client.go 587: Update: api.Update{KVPair:model.KVPair{Key:model.ResourceKey{Name:\"k8s-node1\", Namespace:\"\", Kind:\"Node\"}, Value:(*v3.Node)(0xc4204d2000), Revision:\"512369\", TTL:0}, UpdateType:0x2}\n","stream":"stdout","docker":{"container_id":"49f48c1613be45a92ea1fc06efc6c2928347edf1c86a43432871ab8c5cfac323"},"kubernetes":{"container_name":"calico-node","namespace_name":"kube-system","pod_name":"calico-node-2srnw","pod_id":"23241398-8e80-11e9-8cc4-000c29a74c85","labels":{"controller-revision-hash":"99dc95f6f","k8s-app":"calico-node","pod-template-generation":"1"},"host":"k8s-node1","master_url":"https://10.96.0.1:443/api","namespace_id":"aac82294-8e7f-11e9-8cc4-000c29a74c85"}}

 配置logstash

配置logstash消费messages日志写入elasticsearch

cat config/kafkaInput_fluentd.conf 
input {
    kafka {
        bootstrap_servers => ["192.168.1.122:9092"]
        client_id => "fluentd"
        group_id => "fluentd"
        consumer_threads => 1
        auto_offset_reset => "latest"
        topics => ["messages"]
    }
}

filter {
        json{
                source => "message"
        }
       
       ruby {
       code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
       }
      ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
       }
      ruby {
        code => "event.set('find_time',event.get('@timestamp').time.localtime - 8*60*60)"
       }
     mutate {
    remove_field => ["timestamp"]
    remove_field => ["message"]
    }
    
} 
output {
          elasticsearch{
               hosts => ["192.168.1.122:9200"]
               index => "kubernetes_%{+YYYY_MM_dd}"

          }
#    stdout {
#           codec => rubydebug
#           }
}

 启动logstash

nohup ./bin/logstash -f config/kafkaInput_fluentd.conf --config.reload.automatic --path.data=/opt/logstash/data_fluentd 2>&1 > fluentd.log &

 最终日志展示:

 

推荐阅读