I am using the CDAP reference to start a Data fusion batch pipeline(GCS to GCS).
curl -w "\n" -X POST -H "Authorization: Bearer ${AUTH_TOKEN}" \
"${CDAP_ENDPOINT}/v3/namespaces/default/apps/${PIPELINE_NAME}/workflows/DataPipelineWorkflow/start" \
-d "{ argfile : '${ARGUMENT_FILE}' }"
where argfile is the input URL to the argument setter macro
The argument json file is:
{
"arguments": [
{
"name": "input.path",
"type": "string",
"value": "gs://bucket/employee_sample.csv"
},
{
"name": "directive",
"type": "array",
"value": [
"parse-as-csv :body ',' true",
"drop body"
]
},
{
"name": "output.path",
"type": "string",
"value": "gs://bucket/FusionOutput"
},
{
"name": "PROJECT_ID",
"type": "string",
"value": "project-id"
},
{
"name": "output.schema",
"type": "schema",
"value": [
{
"name": "emp_id",
"type": "string",
"nullable": true
},
{
"name": "name_prefix",
"type": "string",
"nullable": true
},
{
"name": "first_name",
"type": "string",
"nullable": true
}
]
}
]
}
The problem here is that I am receiving this error
Pipeline 'gcs_fusion' failed.
io.cdap.cdap.api.macro.InvalidMacroException: Argument 'input.path' is not defined.
at io.cdap.cdap.etl.common.DefaultMacroEvaluator.lookup(DefaultMacroEvaluator.java:54) ~[na:na]
at io.cdap.cdap.internal.app.runtime.plugin.MacroParser.findRightmostMacro(MacroParser.java:144)
java.lang.RuntimeException: Could not parse response from 'https://storage.cloud.google.com/bucket/argumentj.json': java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 2 column 1
at io.cdap.plugin.ArgumentSetter.handleResponse(ArgumentSetter.java:83) ~[na:na]
at io.cdap.plugin.http.HTTPArgumentSetter.run(HTTPArgumentSetter.java:76) ~[na:na]
The same pipeline works with same template in some runs when I try to pass the argument setter URL through the Data Fusion UI Console.
The argument setter JSON follows the syntax prescribed in https://github.com/data-integrations/argument-setter and the argument json file is set to public in storage bucket.
Would be helpful if someone could resolve this.
Note that I've anonymized my project details here
Assuming your bucket is publicly accessible, then the URL you want to provide to the argument setter have the following pattern:
https://storage.googleapis.com/[BUCKET_NAME]/[OBJECT_NAME]
If the bucket is not publicly accessible, you will need to generate a signed URL for the argument file in Google Cloud Storage. You can read more about how to generate signed URL here.