Search code examples
google-cloud-platformgoogle-cloud-data-fusion

Data Fusion could not parse response from JSON


I am using the CDAP reference to start a Data fusion batch pipeline(GCS to GCS).

curl -w "\n" -X POST -H "Authorization: Bearer ${AUTH_TOKEN}" \
"${CDAP_ENDPOINT}/v3/namespaces/default/apps/${PIPELINE_NAME}/workflows/DataPipelineWorkflow/start" \
-d "{ argfile : '${ARGUMENT_FILE}' }"  

where argfile is the input URL to the argument setter macro

The argument json file is:

{

  "arguments": [

    {

      "name": "input.path",

      "type": "string",

      "value": "gs://bucket/employee_sample.csv"

    },

    {

      "name": "directive",

      "type": "array",

      "value": [

        "parse-as-csv :body ',' true",

        "drop body"

      ]

    },

    {

      "name": "output.path",

      "type": "string",

      "value": "gs://bucket/FusionOutput"

    },

    {

      "name": "PROJECT_ID",

      "type": "string",

      "value": "project-id"

    },

    {

      "name": "output.schema",

      "type": "schema",

      "value": [

        {

          "name": "emp_id",

          "type": "string",

          "nullable": true

        },

        {

          "name": "name_prefix",

          "type": "string",

          "nullable": true

        },

        {

          "name": "first_name",

          "type": "string",

          "nullable": true

        }

      ]

    }

  ]

}

The problem here is that I am receiving this error

Pipeline 'gcs_fusion' failed.
io.cdap.cdap.api.macro.InvalidMacroException: Argument 'input.path' is not defined.
    at io.cdap.cdap.etl.common.DefaultMacroEvaluator.lookup(DefaultMacroEvaluator.java:54) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.plugin.MacroParser.findRightmostMacro(MacroParser.java:144)

java.lang.RuntimeException: Could not parse response from 'https://storage.cloud.google.com/bucket/argumentj.json': java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 2 column 1
    at io.cdap.plugin.ArgumentSetter.handleResponse(ArgumentSetter.java:83) ~[na:na]
    at io.cdap.plugin.http.HTTPArgumentSetter.run(HTTPArgumentSetter.java:76) ~[na:na]

The same pipeline works with same template in some runs when I try to pass the argument setter URL through the Data Fusion UI Console.

The argument setter JSON follows the syntax prescribed in https://github.com/data-integrations/argument-setter and the argument json file is set to public in storage bucket.

Would be helpful if someone could resolve this.

Note that I've anonymized my project details here


Solution

  • Assuming your bucket is publicly accessible, then the URL you want to provide to the argument setter have the following pattern:

    https://storage.googleapis.com/[BUCKET_NAME]/[OBJECT_NAME]
    

    If the bucket is not publicly accessible, you will need to generate a signed URL for the argument file in Google Cloud Storage. You can read more about how to generate signed URL here.