首页 > 技术文章 > Elasticsearch+logstash安装部署

zlnp 2020-10-12 20:40 原文

ELK安装部署

ELK安装部署

ELK是elastic公司提供的一套完整的收集日志并分析展示的产品,分别表示Elasticsearch、Logstash和kibana。

Elasticsearch简称ES,是一个建立在全文搜索引擎Apache Lucene基础上的实时的分布式搜索及分析引擎。
Logstash是一个具有实时传输能力的数据收集引擎,由三个组件构成,其中input组件负责收集数据,常用的数据源包括文件类型、数据库类型;filter组件是用来用来过滤数据;output组件是Logstash的输出组件,可以输出到指定的文件,指导的网络端口或者输出到ES。
Kibana为Elasticsearch提供了一个分析和可视化的web平台,通过Kibana可以对Elasticsearch的索 据进行查询,聚合,删除等各种操作。

使用ELK的目的

受理项目为了提高统计功能的效率,采用Logstash将数据库中的业务数据导入到ES中,通过ES提供的方法以及java Api来完成统计,借助ES搜索引擎的特性来提高统计的效率。

安装包版本要求

ELK版本 7.8,JDK要求1.8以上(Linux系统为例):

目前所用版本:elasticsearch-7.8.0 、logstash-7.8.0.tar、jdk-8u151-linux-x64

Elasticsearch安装配置启动

安装jdk

自行搜索安装以及配置环境变量方法,本文不做阐述。

安装ES
  1. 将ES安装包解压到user/local/elk/elastic-search(自定义);
  2. 修改ES配置文件:/config/elasticsearch.yml
cluster.name: web-application
node.name: node-1
# 启动输入密码访问
xpack.security.transport.ssl.enabled: true
xpack.security.enabled: true
# 设置外网可以访问
network.host: 0.0.0.0
# 启动输入密码访问
xpack.security.transport.ssl.enabled: true
xpack.security.enabled: true
# 设置外网可以访问
network.host: 0.0.0.0

  1. 启动elasticsearch
    ES的启动需要切换非root权限的用户,请配置人员自己创建用户来启动ES若启动失败,自行根据错误信息搜索解决方案
    后台启动执行命令:

nohup ./bin/elasticsearch & 或者 ./bin/elasticsearch -d

  1. 验证启动成功
    浏览器访问http://localhost:9200 (localhost为服务器地址)

Logstash安装配置启动

将Logstash安装包解压到user/local/elk/logstash (自定义)
配置文件见第二大节详解
启动Logstash(若启动失败,自行根据异常信息搜索解决方案)

后台启动执行命令:

nohup ./bin/logstash -f config/jdbc_oracle.conf &

Logstash配置文件详解

本文章通过Logstash将数据库中的业务数据导入到elasticsearch中,所以Logstash数据来源为数据库,输出为我们安装好的ES。

SQL文件配置

将需要的SQL文件放在自定义路径下(/home/A1/logstash-7.8.0/config),可配置多个SQL作为输出。

Oracle依赖包配置

Oracle依赖包ojdbc6.jar放在lib文件夹中,绝对路径要写到下面的启动文件中。

配置启动文件

ogstash启动时候需要指定配置文件,在config文件夹下面新建配置文件elasticsearch_sync.conf
配置文件内容如下:

# 输入
input {
  jdbc {
    jdbc_driver_library => "/home/mysql-connector-java-5.1.10.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"
    # 设置监听间隔 各字段含义(从左至右)分、时、天、月、年,全为*默认含义为每分钟都更新
    schedule => "* * * * *"
    # 查询sql,可以通过更新字段来区分那些是需要更新的
    statement_filepath => "/home/logstash-7.8.0/config/complete.sql"
    # 记录最后的运行时间,注意目录需要创建好
    last_run_metadata_path => "/home/logstash-7.8.0/config/ logstash_jdbc_last_run_person"
    use_column_value => false
    tracking_column => "update_time"
    # 分页处理数据
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 类型,对象后面输出的类型
    type => "complete_corporate"
  }
}
# 过滤
filter {
  # 用ruby解决相差8小时的时区问题, update_time必须要通过statement_filepath配置的sql可以查询出来
  ruby { 
      code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)"
  }
}
# 输出
output {
  if[type] == "complete_corporate"{
    elasticsearch {
      hosts => "localhost:9200"
      user => elastic                                                                                                                                                                                         
      password => elastic
      # 索引名
      index => "complete_corporate_index"
      # 文档名
      document_type => "complete_corporate"
      # 文档ID(主键)
      document_id => "%{body_card_no}" 
    }
  }
  # 将数据输出到控制台
  stdout {
      codec => json_lines
  }
}

启动Logstash

执行命令后台启动:

nohup  ./bin/logstash -f config/elasticsearch_sync.conf &

验证

在浏览器输入http://localhost:9200/_cat/indices?v
,查看ES是否有刚才导入的索引。

推荐阅读