Search code examples
mysqlelasticsearchutf-8logstashlogstash-jdbc

logstash-input-jdbc how to use utf-8 chars in statement


I use logstash-input-jdbc to sync my database to elasticsearch.

Env: (logstash 7.5, elasticsearch 7.5,mysql-connector-java-5.1.48.jar, logstash-input-jdbc-4.3.16)

materials.conf:

input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://localhost:3306/sc_education"
      jdbc_driver_library => "connector/mysql-connector-java-5.1.48.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_user => "dauser"
      jdbc_password => "daname"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50"
      statement_filepath => "./materials.sql"
      schedule => "* * * * *"
      last_run_metadata_path => "./materials.info"
      record_last_run => true
      tracking_column => updated_at
      codec => plain { charset => "UTF-8"}
      # parameters => { "favorite_artist" => "Beethoven" }
      # statement => "SELECT * from songs where artist = :favorite_artist"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "materials"
        document_id => "%{material_id}"
    }
    stdout {
        codec => json_lines
    }
}

materials.sql:

SELECT  material_name,material_id,
        CASE grade_id
                WHEN grade_id =  1 THEN "一年级"
                WHEN grade_id =  2 THEN "二年级"
                WHEN grade_id =  3 THEN "三年级"
                WHEN grade_id =  4 THEN "四年级"
                WHEN grade_id =  5 THEN "五年级"
                WHEN grade_id =  6 THEN "六年级"
                WHEN grade_id =  7 THEN "初一"
                WHEN grade_id =  8 THEN "初二"
                WHEN grade_id =  9 THEN "初三"
                WHEN grade_id = 10 THEN "高一"
                WHEN grade_id = 11 THEN "高二"
                WHEN grade_id = 12 THEN "高三"
                ELSE "" END as grade,
        CASE subject_id
                WHEN subject_id =  1 THEN "数学"
                WHEN subject_id =  2 THEN "物理"
                WHEN subject_id =  3 THEN "化学"
                WHEN subject_id =  4 THEN "语文"
                WHEN subject_id =  5 THEN "英语"
                WHEN subject_id =  6 THEN "科学"
                WHEN subject_id =  7 THEN "音乐"
                WHEN subject_id =  8 THEN "绘画"
                WHEN subject_id =  9 THEN "政治"
                WHEN subject_id = 10 THEN "历史"
                WHEN subject_id = 11 THEN "地理"
                WHEN subject_id = 12 THEN "生物"
                WHEN subject_id = 13 THEN "奥数"
                ELSE "" END as subject,
        CASE course_term_id
                WHEN course_term_id = 1 THEN "春"
                WHEN course_term_id = 2 THEN "暑"
                WHEN course_term_id = 3 THEN "秋"
                WHEN course_term_id = 4 THEN "寒"
                ELSE "" END as season,
        created_at, updated_at from sc_materials where updated_at > :sql_last_value and material_id in (2025,317,2050);

./bin/logstash -f materials.conf

{"@version":"1","updated_at":"2019-08-19T02:04:54.000Z","season":"?","grade":"","created_at":"2019-08-19T02:04:54.000Z","@timestamp":"2019-12-13T01:02:01.907Z","material_name":"test material seri''al","material_id":2025,"subject":"??"}
{"@version":"1","updated_at":"2019-08-26T09:25:35.000Z","season":"","grade":"","created_at":"2019-08-26T09:25:35.000Z","@timestamp":"2019-12-13T01:02:01.908Z","material_name":"人教版高中英语必修三第10讲Unit5 Canada The True North语法篇A学生版2.pdf","material_id":2050,"subject":""}
{"@version":"1","updated_at":"2019-08-10T06:50:48.000Z","season":"?","grade":"","created_at":"2019-05-27T06:26:44.000Z","@timestamp":"2019-12-13T01:02:01.880Z","material_name":"90aca2238832143fb75dcf0fe6dbbfa9.pdf","material_id":317,"subject":""}

The chinese chars in db works well, but the chinese chars in statement becomes chars ?.


Solution

  • for me, characterEncoding=utf8 was not working.

    after added this,

     stdin {
            codec => plain { charset => "UTF-8"}
       }
    

    works well.

    here is my working conf file. It's a bit of a time to post an answer, but I hope it helps someone.

    input {
        jdbc {
            jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
            jdbc_user => "atlas"
            jdbc_password => "atlas"
            jdbc_validate_connection => true
            jdbc_driver_library => "/lib/postgres-42-test.jar"
            jdbc_driver_class => "org.postgresql.Driver"
            schedule => "* * * * *"
            statement => "SELECT * from naver_city"
       }
       stdin {
            codec => plain { charset => "UTF-8"}
       }
    }
    output {
     elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "2020-04-23-2"
            doc_as_upsert => true
            action => "update"
            document_id => "%{code}"
            }
     stdout { codec => rubydebug }
    }