Search code examples
airflowairflow-taskflow

Dynamic Task Mapping for http operator and query string


Let's say that I have the following (simplified) dag: I have a task that returns a series of query parameters values, and I want to spawn a dynamic task instance of httpoperator to do a query like http://example.com?key=value1, http://example.com?key=value2, etc. But I'm not able to map the data field in the operator

@task
def get_values():
  return ["value1","value2","value3"]

@dag
def mydag() -> None:
    values = get_values()
    gets = HttpOperator.partial(task_id='gets', method='GET').expand(
        data={ 'key': values }, # expanding in the ui, but "ValueError: too many values to unpack (expected 2)" at runtime
        data=[{'key': value} for value in values], # dag error: "TypeError: 'XComArg' object is not iterable"
    )

I'm not sure how to set up the parameter to actually use the values object properly.

worse, in the real dag data normally would refer also to additional qp and some of them comes from other tasks (with a static mapping)


Solution

  • FWIW I got a way to expand the query parameter to multiple values. To do it I relies on Operator.expand_kwargs and XComArg.map:

    @task
    def get_values():
      return ["value1","value2","value3"]
    
    @dag
    def mydag() -> None:
        values = get_values()
        gets = HttpOperator.partial(task_id='gets', method='GET').expand_kwargs(
            values.map(lambda value: { 'data': {'key': value} }
        )