I am trying to use Azure Data Factory to read data from a FHIR server and transform the results into newline delimited JSON (ndjson) files in Azure Blob storage. Specifically, if you query a FHIR server, you might get something like:
{
"resourceType": "Bundle",
"id": "som-id",
"type": "searchset",
"link": [
{
"relation": "next",
"url": "https://fhirserver/?ct=token"
},
{
"relation": "self",
"url": "https://fhirserver/"
}
],
"entry": [
{
"fullUrl": "https://fhirserver/Organization/1234",
"resource": {
"resourceType": "Organization",
"id": "1234",
// More fields
},
{
"fullUrl": "https://fhirserver/Organization/456",
"resource": {
"resourceType": "Organization",
"id": "456",
// More fields
},
// More resources
]
}
Basically a bundle of resources. I would like to transform that into a newline delimited (aka ndjson) file where each line is just the json for a resource:
{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources
I am able to get the REST connector set up and it can query the FHIR server (including pagination), but no matter what I try I cannot seem to generate the ouput I want. I set up an Azure Blob storage dataset:
{
"name": "AzureBlob1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "setOfObjects"
},
"fileName": "myout.json",
"folderPath": "outfhirfromadf"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
And configure a copy activity:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"resource": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
But at the end (in spite of configuring the schema mapping), it the end result in the blob is always just the original bundle returned from the server. If I configure the output blob as being a comma delimited text, I can extract fields and create a flattened tabular view, but that is not really what I want.
Any suggestions would be much appreciated.
So I sort of found a solution. If I do the original step of converting where the bundles are simply dumped in the JSON file and then do a nother conversion from the JSON file to what I pretend to be a text file into another blob, I can get the njson file created.
Basically, define another blob dataset:
{
"name": "AzureBlob2",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"structure": [
{
"name": "Prop_0",
"type": "String"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"rowDelimiter": "",
"quoteChar": "",
"nullValue": "\\N",
"encodingName": null,
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": "myout.json",
"folderPath": "adfjsonout2"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Note that this one TextFormat
and also note that the quoteChar
is blank. If I then add another Copy Activity:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"['resource']": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
},
{
"name": "Copy Data2",
"type": "Copy",
"dependsOn": [
{
"activity": "Copy Data1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"resource": "Prop_0"
}
}
},
"inputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob2",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
Then it all works out. It is not ideal in that I now have two copies of the data in blobs, but one can easily be deleted, I suppose.
I would still love to hear about it if somebody has a one-step solution.