python - Clickhouse - 数据转换/解析
问题描述
我们使用 Clickhouse 来存储 HAProxy 和 Kong 日志和指标。
“管道”是围绕 syslog 协议和 rsyslog 构建的,如下所示:HAProxy/Kong -> 本地 rsyslog -> 远程 rsyslog (TCP) -> omclickhouse rsyslog 模块 -> clickhouse。
syslog 消息的格式在 HAProxy 和 Kong 之间当然不同。
HAProxy 消息如下所示:
1.2.3.4:58629 [06/Jun/2020:14:54:59.932] HTTPS~ HAPBACKEND/HAPSERVER 0/0/1/36/37 200 778 - - ---- 32/32/1/1/0 0/0 "GET /api/map/v2/GetSomeStuff/json?Latitude=47.22960133109915&Longitude=-1.5727845858797176 HTTP/1.1"
如此处所述:https ://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 ,
Kong 消息是基于 JSON 的,如下所示:
{
"request": {
"method": "GET",
"uri": "/get",
"url": "http://httpbin.org:8000/get",
"size": "75",
"querystring": {},
"headers": {
"accept": "*/*",
"host": "httpbin.org",
"user-agent": "curl/7.37.1"
},
"tls": {
"version": "TLSv1.2",
如此处所述:https ://docs.konghq.com/hub/kong-inc/syslog/
rsyslog omclickhouse 模块(默认情况下)将所有 syslog 消息插入到名为“SystemEvents”的表中,该表具有以下结构:
┌─severity─┬─facility─┬───────────timestamp─┬─hostname─────────────────┬─tag────────────┬─message──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 6 │ 18 │ 2020-06-06 15:01:00 │ reverseproxy.fqdn │ haproxy[6892]: │ 1.2.3.4:57220 [06/Jun/2020:15:00:59.996] HTTPS~ HAPBACKEND/HAPSRV 15/0/1/2/18 500 617 - - ---- 48/42/9/9/0 0/0 "POST /SOAPService HTTP/1.1" │
└──────────┴──────────┴─────────────────────┴──────────────────────────┴────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
(我们不想涉足自定义 rsyslog 解析 C 模块的开发)
出于报告目的,我们感兴趣的是 syslog 消息字段中包含的 HAProxy(或 Kong)详细信息,而不是整个 syslog 内容本身。因此,为了能够获得“细粒度”查询功能,我们创建了另一个表,例如“HAPROXY_LOGS”,其结构如下:
(`CLIENT_IP` String, `CLIENT_PORT` Int32, `REQUEST_DATE` DateTime, `FRONTEND_NAME` String, `BACKEND_NAME` String, `SERVER_NAME` String, `TREQ` Int32, `TWAIT` Int32, `TCONNECTION` Int32, `TRESPONSE` Int32, `TACTIVE` Int32, `STATUS_CODE` Int32, `BYTES_READ` Int32, `CAPTURED_REQUEST_COOKIE` String, `CAPTURED_RESPONSE_COOKIE` String, `TERMINATION_STATE` String, `ACTCONN` Int32, `FECONN` Int32, `BECONN` Int32, `SRV_CONN` Int32, `RETRIES` Int32, `SRV_QUEUE` Int32, `BACKEND_QUEUE` Int32, `METHOD` String, `REQUEST` String, `PARAMETERS` String, `PROTOCOL` String) ENGINE = MergeTree() PARTITION BY toYYYYMM(REQUEST_DATE) ORDER BY (REQUEST_DATE, TRESPONSE, STATUS_CODE, PARAMETERS) SETTINGS index_granularity = 8192
这就是事情开始变得更奇怪的地方...... Clickhouse 本身似乎既不提供某种调度程序,à la MSSQL,也不提供将编程语言嵌入引擎的方法(PL/pgSQL,PL/ Python - 类似),也不是触发器(我们还没有研究物化视图)。因此,为了将数据从一个表转换和移动到另一个表,cron 每分钟启动一个 shell 脚本,使用 clickhouse-client 获取输入数据,将其通过管道传输到 Python 脚本,然后将其结果本身再次通过管道传输到 clickhouse -插入客户端:
* * * * * { /usr/bin/clickhouse-client < /path/clkh/extract-system-events.sql | /path/clkh/latestmessages-to-TSV-pipe.py 2>/path/clkh/errors-haproxy.log ; } |/usr/bin/clickhouse-client --query="INSERT INTO HAPROXY_LOGS FORMAT TSV" >> /var/log/latestmessages-to-TSV-pipe.log
HAProxy 和 Kong 解析的 Python 脚本不同。
听起来像一个肮脏的黑客......
有没有更好的方法来完成同样的事情?
(尽管有这个 hack,整个东西都很好,报告构建时间大大减少了,Clickhouse 存储了 600M+ 行没有任何问题。)
谢谢
解决方案
我认为在 ClickHouse 之外转换数据是正确的方法。
然而,CH 可以自行承担。让我们以 JSON 日志为例,将使用物化视图和丰富的json 相关函数集):
/* Table that store JSON-logs from several sources. */
CREATE TABLE Raw_Json_Logs (
time DateTime DEFAULT now(),
json String,
log_type LowCardinality(String)
) ENGINE = MergeTree()
ORDER BY time;
/* Table for Kong-logs. */
CREATE MATERIALIZED VIEW Kong_Logs (
time DateTime DEFAULT now(),
raw_json String,
/* define the required log-attributes that should be stored in separate columns */
method LowCardinality(String),
host LowCardinality(String),
/* .. */
raw_response_headers String
/* .. */
) ENGINE = MergeTree()
ORDER BY (time, method, host /* .. */)
AS
SELECT
time,
json AS raw_json,
JSONExtractString(json, 'request', 'method') AS method,
JSONExtractString(json, 'request', 'headers', 'host') AS host,
JSONExtractRaw(json, 'response', 'headers') AS raw_response_headers
/* .. */
FROM Raw_Json_Logs
/* Takes only Kong-specific logs. */
WHERE log_type = 'kong';
测试数据集:
INSERT INTO Raw_Json_Logs(json, log_type)
VALUES ('{"request":{"method":"GET","uri":"/get","url":"http://httpbin.org:8000/get","size":"75","querystring":{},"headers":{"accept":"*/*","host":"httpbin.org","user-agent":"curl/7.37.1"},"tls":{"version":"TLSv1.2","cipher":"ECDHE-RSA-AES256-GCM-SHA384","supported_client_ciphers":"ECDHE-RSA-AES256-GCM-SHA384","client_verify":"NONE"}},"upstream_uri":"/","response":{"status":200,"size":"434","headers":{"Content-Length":"197","via":"kong/0.3.0","Connection":"close","access-control-allow-credentials":"true","Content-Type":"application/json","server":"nginx","access-control-allow-origin":"*"}},"tries":[{"state":"next","code":502,"ip":"127.0.0.1","port":8000},{"ip":"127.0.0.1","port":8000}],"authenticated_entity":{"consumer_id":"80f74eef-31b8-45d5-c525-ae532297ea8e","id":"eaa330c0-4cff-47f5-c79e-b2e4f355207e"},"route":{"created_at":1521555129,"hosts":null,"id":"75818c5f-202d-4b82-a553-6a46e7c9a19e","methods":null,"paths":["/example-path"],"preserve_host":false,"protocols":["http","https"],"regex_priority":0,"service":{"id":"0590139e-7481-466c-bcdf-929adcaaf804"},"strip_path":true,"updated_at":1521555129},"service":{"connect_timeout":60000,"created_at":1521554518,"host":"example.com","id":"0590139e-7481-466c-bcdf-929adcaaf804","name":"myservice","path":"/","port":80,"protocol":"http","read_timeout":60000,"retries":5,"updated_at":1521554518,"write_timeout":60000},"workspaces":[{"id":"b7cac81a-05dc-41f5-b6dc-b87e29b6c3a3","name":"default"}],"consumer":{"username":"demo","created_at":1491847011000,"id":"35b03bfc-7a5b-4a23-a594-aa350c585fa8"},"latencies":{"proxy":1430,"kong":9,"request":1921},"client_ip":"127.0.0.1","started_at":1433209822425}', 'kong');
INSERT INTO Raw_Json_Logs(json, log_type)
VALUES ('{}', 'other_type');
推荐阅读
- django - django表单渲染api,自定义渲染器示例
- mysql - 如何从节点js中的mysql回调函数返回数据
- python - 如何添加到 dicts 键并将其格式化为字符串?
- wordpress - 在搜索表单 woocommerce 中添加类别下拉列表
- java - 改造、Kotlin 和访问隐藏方法灰名单 API
- css - 如何在 Tailwind 中转换背景位置:%
- ios - 如何始终在具有动态高度的视图上获得圆角(角半径)?
- c++ - std::declval 如何返回值?
- ios - 带有完成处理程序的 UIImageView
- ruby-on-rails - TargetRubyVersion 参数,在 AllCops Not Working 下