Search code examples
amazon-web-servicesamazon-kinesis-firehoseapache-iceberg

AWS Firehose (Java SDK) - unable to deliver data to Iceberg tables defined in Glue


The following AWS CLI command works fine:

aws firehose put-record --delivery-stream-name 52N-STA-DF-ICBG --cli-binary-format raw-in-base64-out --record='{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"
}'

However, when I try to do it using my Java application, Firehose is unsuccessful in delivering the record to the Iceberg table:

public void icebergMerge(ObjectNode dataNode, String tableName, String operation)
            throws STACRUDException {

        ObjectNode rootNode = mapper.createObjectNode();

        ObjectNode adf_metadata = rootNode.putObject(FirehoseConstants.ADF_METADATA);
        ObjectNode otf_metadata = adf_metadata.putObject(FirehoseConstants.OTF_METADATA);
        otf_metadata.put(FirehoseConstants.TABLE, tableName.toLowerCase());
        otf_metadata.put(FirehoseConstants.DATABASE, FirehoseConstants.DATABASE_NAME);
        otf_metadata.put(FirehoseConstants.OPERATION, operation);


        ObjectNode adf_record = rootNode.putObject(FirehoseConstants.ADF_RECORD);
        adf_record.setAll(dataNode);

        try {
            String dataPayload = mapper.writeValueAsString(rootNode) + "\n";
            ObjectNode outerNode = mapper.createObjectNode();
            outerNode.put("Data", dataPayload);
            streamToFirehose(mapper.writeValueAsString(outerNode));
        } catch (JsonProcessingException e) {
            throw new STACRUDException("Bad request: cannot parse payload for table " + operation);
        }
    }

private void streamToFirehose(String jsonPayload) {
        try {
            Record record = Record.builder()
                    .data(SdkBytes.fromUtf8String(jsonPayload))
                    .build();

            PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                    .deliveryStreamName(FirehoseConstants.DELIVERY_STREAM_NAME)
                    .record(record)
                    .build();

            PutRecordResponse resp = firehoseClient.putRecord(putRecordRequest);
            LOGGER.debug("Record sent successfully to Firehose.");
        } catch (Exception e) {
            LOGGER.debug("Error sending to Firehose: " + e.getMessage());
        }
    }

dashboard

Here is what jsonPayload looks like in streamToFirehose() function as per the debugger:

{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"}

Solution

  • The problem was in the formatting of the record. The putRecord API in Java automatically encapsulates the payload in { "Data": payload } format. Thus the outerNode object was redundant.