Search code examples
pythonhaproxyrsyslogclickhousekong

Clickhouse - data transformation/parsing


We are using Clickhouse to store HAProxy and Kong logs and metrics.

The "pipeline" is built around the syslog protocol and rsyslog, as follow : HAProxy/Kong -> local rsyslog -> remote rsyslog (TCP) -> omclickhouse rsyslog module -> clickhouse.

The format of the syslog messages of course differs between HAProxy and Kong.

HAProxy messages look like this :

1.2.3.4:58629 [06/Jun/2020:14:54:59.932] HTTPS~ HAPBACKEND/HAPSERVER 0/0/1/36/37 200 778 - - ---- 32/32/1/1/0 0/0 "GET /api/map/v2/GetSomeStuff/json?Latitude=47.22960133109915&Longitude=-1.5727845858797176 HTTP/1.1"

as described here : https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 ,

Kong messages are JSON-based and look like this :

{
    "request": {
        "method": "GET",
        "uri": "/get",
        "url": "http://httpbin.org:8000/get",
        "size": "75",
        "querystring": {},
        "headers": {
            "accept": "*/*",
            "host": "httpbin.org",
            "user-agent": "curl/7.37.1"
        },
        "tls": {
            "version": "TLSv1.2",

as described here : https://docs.konghq.com/hub/kong-inc/syslog/

The rsyslog omclickhouse module inserts (by default) all the syslog message into a table named "SystemEvents", which has the following structure :

┌─severity─┬─facility─┬───────────timestamp─┬─hostname─────────────────┬─tag────────────┬─message──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│        6 │       18 │ 2020-06-06 15:01:00 │ reverseproxy.fqdn        │ haproxy[6892]: │  1.2.3.4:57220 [06/Jun/2020:15:00:59.996] HTTPS~ HAPBACKEND/HAPSRV 15/0/1/2/18 500 617 - - ---- 48/42/9/9/0 0/0 "POST /SOAPService HTTP/1.1" │
└──────────┴──────────┴─────────────────────┴──────────────────────────┴────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

(We didn't want to get into the development of a custom rsyslog parsing C module)

For reporting purposes, we are interested in the HAProxy (or Kong) details contained in the syslog message field, rather than in the whole syslog content itself. So, to be able to get "fine-grained" querying capabilities, we created another table, say "HAPROXY_LOGS", with the following structure :

(`CLIENT_IP` String, `CLIENT_PORT` Int32, `REQUEST_DATE` DateTime, `FRONTEND_NAME` String, `BACKEND_NAME` String, `SERVER_NAME` String, `TREQ` Int32, `TWAIT` Int32, `TCONNECTION` Int32, `TRESPONSE` Int32, `TACTIVE` Int32, `STATUS_CODE` Int32, `BYTES_READ` Int32, `CAPTURED_REQUEST_COOKIE` String, `CAPTURED_RESPONSE_COOKIE` String, `TERMINATION_STATE` String, `ACTCONN` Int32, `FECONN` Int32, `BECONN` Int32, `SRV_CONN` Int32, `RETRIES` Int32, `SRV_QUEUE` Int32, `BACKEND_QUEUE` Int32, `METHOD` String, `REQUEST` String, `PARAMETERS` String, `PROTOCOL` String) ENGINE = MergeTree() PARTITION BY toYYYYMM(REQUEST_DATE) ORDER BY (REQUEST_DATE, TRESPONSE, STATUS_CODE, PARAMETERS) SETTINGS index_granularity = 8192

This is where the things start to, well, get weirder... Clickhouse itself does not seems to provide neither some kind of scheduler, à la MSSQL, nor a way to embed a programming langage into the engine (PL/pgSQL, PL/Python - like), nor triggers (we didn't investigated materialized views yet). So, to transform and move the data from one table to another, a shell script is launched by cron every minute, use clickhouse-client to get the input data, pipe it to a Python script, whose result itself is then piped again to clickhouse-client for insertion :

* * * * * { /usr/bin/clickhouse-client < /path/clkh/extract-system-events.sql | /path/clkh/latestmessages-to-TSV-pipe.py 2>/path/clkh/errors-haproxy.log ; } |/usr/bin/clickhouse-client --query="INSERT INTO HAPROXY_LOGS FORMAT TSV" >> /var/log/latestmessages-to-TSV-pipe.log

The Python script is different for HAProxy and Kong parsing.

Sounds like a dirty hack...

Is there a better way to accomplish the same things ?

(Despite this hack, the whole stuff works great, reporting build time was reduced by a large factor, Clickhouse stores 600M+ rows without any issue.)

Thanks


Solution

  • I think transforming data outside ClickHouse is the right way.

    Nevertheless, CH can take it upon itself. Let's consider for example JSON-logs and will use materialized view and rich set of json-related functions):

    /* Table that store JSON-logs from several sources. */
    CREATE TABLE Raw_Json_Logs (
      time DateTime DEFAULT now(),
      json String,
      log_type LowCardinality(String)
    ) ENGINE = MergeTree()
    ORDER BY time;
    
    /* Table for Kong-logs. */
    CREATE MATERIALIZED VIEW Kong_Logs (
      time DateTime DEFAULT now(),
      raw_json String,
      /* define the required log-attributes that should be stored in separate columns */
      method LowCardinality(String),
      host LowCardinality(String),
      /* .. */
      raw_response_headers String
      /* .. */
    ) ENGINE = MergeTree()
    ORDER BY (time, method, host /* .. */)
    AS 
    SELECT 
      time,
      json AS raw_json,
      JSONExtractString(json, 'request', 'method') AS method,
      JSONExtractString(json, 'request', 'headers', 'host') AS host,
      JSONExtractRaw(json, 'response', 'headers') AS raw_response_headers
      /* .. */
    FROM Raw_Json_Logs
    /* Takes only Kong-specific logs. */
    WHERE log_type = 'kong';
    
    

    Test data set:

    INSERT INTO Raw_Json_Logs(json, log_type)
    VALUES ('{"request":{"method":"GET","uri":"/get","url":"http://httpbin.org:8000/get","size":"75","querystring":{},"headers":{"accept":"*/*","host":"httpbin.org","user-agent":"curl/7.37.1"},"tls":{"version":"TLSv1.2","cipher":"ECDHE-RSA-AES256-GCM-SHA384","supported_client_ciphers":"ECDHE-RSA-AES256-GCM-SHA384","client_verify":"NONE"}},"upstream_uri":"/","response":{"status":200,"size":"434","headers":{"Content-Length":"197","via":"kong/0.3.0","Connection":"close","access-control-allow-credentials":"true","Content-Type":"application/json","server":"nginx","access-control-allow-origin":"*"}},"tries":[{"state":"next","code":502,"ip":"127.0.0.1","port":8000},{"ip":"127.0.0.1","port":8000}],"authenticated_entity":{"consumer_id":"80f74eef-31b8-45d5-c525-ae532297ea8e","id":"eaa330c0-4cff-47f5-c79e-b2e4f355207e"},"route":{"created_at":1521555129,"hosts":null,"id":"75818c5f-202d-4b82-a553-6a46e7c9a19e","methods":null,"paths":["/example-path"],"preserve_host":false,"protocols":["http","https"],"regex_priority":0,"service":{"id":"0590139e-7481-466c-bcdf-929adcaaf804"},"strip_path":true,"updated_at":1521555129},"service":{"connect_timeout":60000,"created_at":1521554518,"host":"example.com","id":"0590139e-7481-466c-bcdf-929adcaaf804","name":"myservice","path":"/","port":80,"protocol":"http","read_timeout":60000,"retries":5,"updated_at":1521554518,"write_timeout":60000},"workspaces":[{"id":"b7cac81a-05dc-41f5-b6dc-b87e29b6c3a3","name":"default"}],"consumer":{"username":"demo","created_at":1491847011000,"id":"35b03bfc-7a5b-4a23-a594-aa350c585fa8"},"latencies":{"proxy":1430,"kong":9,"request":1921},"client_ip":"127.0.0.1","started_at":1433209822425}', 'kong');
    
    INSERT INTO Raw_Json_Logs(json, log_type)
    VALUES ('{}', 'other_type');