Search code examples
apache-flinkflink-sqlpyflink

How to reference nested JSON within PyFlink SQL when JSON schema varies?


I have a stream of events I wish to process using PyFlink, where the events are taken from AWS EventBridge. The events in this stream share a number of common fields, but their detail field varies according to the value of the source and/or detail-type field. For example, here is an example event from EC2:

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
    "instance-id": " i-1234567890abcdef0",
    "state": "terminated"
  }
}

The id, version, source etc fields are consistent across event types, but note that a different type of event in the stream would have a different shape to the detail field, e.g. a CodeBuild event might look like this:

  "detail":{
    "build-status": "SUCCEEDED",
    "project-name": "my-sample-project",
    "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
    "additional-information": {
      "artifact": {
        "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
        "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
        "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
      }
     }
   }

I'd like to create a statement that looks like the below to key the stream by detail-type to feed different sub-tables, where each sub-table is processed differently.

INSERT INTO ec2_event_table SELECT * from input_table WHERE source = 'aws.ec2'
INSERT INTO codebuild_event_table SELECT * from input_table WHERE source = 'aws.codebuild'

How do I define input_table (the input table containing the multiplexed events)? I've tried:

CREATE TABLE input_table (
            source VARCHAR,
            detail MAP
          )

but this gives me an error. I need to specify the type of MAP, e.g. MAP<VARCHAR, VARCHAR>, I can't use MAP<>.

How do I refer to deeply nested JSON using PyFlink SQL?

Is what I am trying to do possible with SQL and the Table API, or do I need to use the DataStream API? I don't want to create a different input stream for each different event type.


Solution

  • The column detail could be declared as VARCHAR and then input_table could be defined as following:

    CREATE TABLE input_table (
        version VARCHAR,
        id VARCHAR,
        detail-type VARCHAR,
        source VARCHAR,
        account VARCHAR,
        time VARCHAR,
        region VARCHAR,
        detail VARCHAR
    ) with (
        ...
    )
    

    Moreover, if you want to process the column detail, you could parse it into a json in a Python UDF as following:

    @udf(result_type=DataTypes.STRING())
    def get_id(detail):
        detail_json = json.loads(detail)
        if 'build-id' in detail_json:
            return detail_json['build-id']
        else:
            return detail_json['instance-id']