Search code examples
hadoophdfsapache-kafkaavroflume

skip the sink step in kafka when posted data corrupted


At the java server side after some processes, I am posting the log data (json format) from server to kafka via restful webservice.

At the hdfs side my sink type is avro. So for parsing json (source) to avro (destination) I am using morphline and avro schema.

If the posted data is not suitable for morphline or avro schema, normally I get the error below,

Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in string value

Also if I get this once, offset is not move anymore. Briefly if kafka gets this error only once, it can't sink posted data anymore.

To avoid from this error, I suppose that there is 2 solutions. First one is that at the server side writing json validator for avro schema which used in big data side. Second way which I prefered is that skip and do not sink the log data which unformatted for requested avro schema. But after skip a corrupted data, if kafka gets suitable data it should sink it.

I think it can be possible if I add some parameters in flume or kafka configuration file. So how can I skip the sink step when posted data is not suitable for the requested schema or requested morphline?


Solution

  • I solved the problem at morphline side,

    Added try-catch code block in morphline like that

    morphlines: [
      {
        id: convertJsonToAvro
        importCommands: [ "org.kitesdk.**" ]
        commands: [
           {
             tryRules {
                  catchExceptions : true
               rules : [
                 {
                   commands : [
                     # save initial state
                     { readJson {} }
                    # extract JSON objects into fields
                  { extractJsonPaths {
                    flatten: true
                    paths: {
                PROJECT_NAME: /PROJECT_NAME
                WSDL_NAME: /WSDL_NAME
                ....
                ....
                ....
                MESSAGE_OUT: /MESSAGE_OUT
            }
          } }
          # convert the extracted fields to an avro object
          # described by the schema in this field
          { toAvro {
            schemaFile:/u0x/myPath/myAvroSchema.avsc
          } }
          # serialize the object as avro
          { writeAvroToByteArray: {
            format: containerlessBinary
                  } }
               ]
             }
             {
              commands : [
                { logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
                { dropRecord {} }    
                ]
             }
           ]
         }   
        }    
       ]
      }
    ]
    

    In tryRules I am forcing the code to catch all exceptions.

    In rules: You can write "command:" block whatever you want, if one of them throws an exception except the last command block, the last command will run. Remember that last one is "catch". I mean my in case, if first command block fails, last (second) command will run. If first command run perfectly, last command won't work, because last command block work like a catch block.

    So when the code readJson {} fails in first command block, it throw an exception and last command (catch block) handles it so it don't try sink current data in kafka topic because it will run dropRecord {}.

    For detailed documentation, you can visit kitesdk.