Search code examples
azureazure-data-factoryhl7-fhir

Using ADF REST connector to read and transform FHIR data


I am trying to use Azure Data Factory to read data from a FHIR server and transform the results into newline delimited JSON (ndjson) files in Azure Blob storage. Specifically, if you query a FHIR server, you might get something like:

{
    "resourceType": "Bundle",
    "id": "som-id",
    "type": "searchset",
    "link": [
        {
            "relation": "next",
            "url": "https://fhirserver/?ct=token"
        },
        {
            "relation": "self",
            "url": "https://fhirserver/"
        }
    ],
    "entry": [
        {
            "fullUrl": "https://fhirserver/Organization/1234",
            "resource": {
                "resourceType": "Organization",
                "id": "1234",
                // More fields
        },
        {
            "fullUrl": "https://fhirserver/Organization/456",
            "resource": {
                "resourceType": "Organization",
                "id": "456",
                // More fields
        },

        // More resources
    ]
}

Basically a bundle of resources. I would like to transform that into a newline delimited (aka ndjson) file where each line is just the json for a resource:

{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources

I am able to get the REST connector set up and it can query the FHIR server (including pagination), but no matter what I try I cannot seem to generate the ouput I want. I set up an Azure Blob storage dataset:

{
    "name": "AzureBlob1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "typeProperties": {
            "format": {
                "type": "JsonFormat",
                "filePattern": "setOfObjects"
            },
            "fileName": "myout.json",
            "folderPath": "outfhirfromadf"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

And configure a copy activity:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "resource": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

But at the end (in spite of configuring the schema mapping), it the end result in the blob is always just the original bundle returned from the server. If I configure the output blob as being a comma delimited text, I can extract fields and create a flattened tabular view, but that is not really what I want.

Any suggestions would be much appreciated.


Solution

  • So I sort of found a solution. If I do the original step of converting where the bundles are simply dumped in the JSON file and then do a nother conversion from the JSON file to what I pretend to be a text file into another blob, I can get the njson file created.

    Basically, define another blob dataset:

    {
        "name": "AzureBlob2",
        "properties": {
            "linkedServiceName": {
                "referenceName": "AzureBlobStorage1",
                "type": "LinkedServiceReference"
            },
            "type": "AzureBlob",
            "structure": [
                {
                    "name": "Prop_0",
                    "type": "String"
                }
            ],
            "typeProperties": {
                "format": {
                    "type": "TextFormat",
                    "columnDelimiter": ",",
                    "rowDelimiter": "",
                    "quoteChar": "",
                    "nullValue": "\\N",
                    "encodingName": null,
                    "treatEmptyAsNull": true,
                    "skipLineCount": 0,
                    "firstRowAsHeader": false
                },
                "fileName": "myout.json",
                "folderPath": "adfjsonout2"
            }
        },
        "type": "Microsoft.DataFactory/factories/datasets"
    }
    

    Note that this one TextFormat and also note that the quoteChar is blank. If I then add another Copy Activity:

    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "Copy Data1",
                    "type": "Copy",
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "typeProperties": {
                        "source": {
                            "type": "RestSource",
                            "httpRequestTimeout": "00:01:40",
                            "requestInterval": "00.00:00:00.010"
                        },
                        "sink": {
                            "type": "BlobSink"
                        },
                        "enableStaging": false,
                        "translator": {
                            "type": "TabularTranslator",
                            "schemaMapping": {
                                "['resource']": "resource"
                            },
                            "collectionReference": "$.entry"
                        }
                    },
                    "inputs": [
                        {
                            "referenceName": "FHIRSource",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "AzureBlob1",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "Copy Data2",
                    "type": "Copy",
                    "dependsOn": [
                        {
                            "activity": "Copy Data1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "typeProperties": {
                        "source": {
                            "type": "BlobSource",
                            "recursive": true
                        },
                        "sink": {
                            "type": "BlobSink"
                        },
                        "enableStaging": false,
                        "translator": {
                            "type": "TabularTranslator",
                            "columnMappings": {
                                "resource": "Prop_0"
                            }
                        }
                    },
                    "inputs": [
                        {
                            "referenceName": "AzureBlob1",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "AzureBlob2",
                            "type": "DatasetReference"
                        }
                    ]
                }
            ]
        },
        "type": "Microsoft.DataFactory/factories/pipelines"
    }
    

    Then it all works out. It is not ideal in that I now have two copies of the data in blobs, but one can easily be deleted, I suppose.

    I would still love to hear about it if somebody has a one-step solution.