Search code examples
pythonorchestrationopen-telemetrydagsterobservability

Monitor/alert for orchesration in Dagster/Python, how to record spans?


I'm writing a data processing pipeline orchestrated in Dagster, and I'd like to add monitoring/alerting.

To simplify a use case, we process a few thousand small pieces of data, and each one could be processed by one of 4-5 different major workflows. I'd like to track the time it takes each piece of data to get fully processed and alert if any one takes > 1h. And I'd like to track how many pieces of data each workflow process and alert if any is too far from its normal value.

The challenge I'm coming up against is that OpenTelemetry expects spans be identified with context managers:

with tracer.start_as_current_span("span-name") as span:
    # do some work

However, my pipeline work is broken up into multiple Python functions, and the Dagster orchestration framework ties them together. In production the @ops will be run on separate Kubernetes nodes. Here's an example:

@op(
   out=DynamicOut(str),
)
def find_small_data_sets(context):
    """Starts 1 dataset going through the pipeline."""
    datasets = db.list_some_things()
    for dataset in datasets:
        yield DynamicOutput(value=data)


@op
def process_data_part_one(data: str) -> str:
    pass # Do some work on one of the data sets.


@op
def process_data_part_two(data: str) -> int:
    # Do more work on a data set.
    # conceptually would be part of the same span
    # as process_data_part_one


@op
def workflow_done(outputs: List[int]) -> int:
    # Finish up the workflow. Here is where a workflow-level
    # span might end.
    return sum(sizes)


@job
def do_full_orchestrated_job():
    """This function defines the DAG structure.

    It does not perform the actual runtime execution of my job
    when it gets called.
    """
    datasets = find_small_data_sets()
    processed_datasets = (
        datasets
        .map(process_data_part_one)
        .map(process_data_part_two)
    )
    workflow_done(processed_datasets.collect())

Given I don't have access to instrument the Dagster orchestration framework itself, is there a way I can use OpenTelemetry to monitor my pipeline? Would starting and ending spans in different functions (without a context manager) be possible, especially if the start and end are actually running on different CPUs? Or is there a better tool for this kind of monitoring/alerting?

Thanks for reading!


Solution

  • I ended up using custom trace propagation so that the Dagster job had one shared trace, and then each op had its own span. So I didn't (de)serialize spans, just the trace. See conceptual propagation docs and Python propagation docs.

    A simple example of Python trace context propagation:

    from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
    
    
    def get_trace_context():
        carrier = {}
        TraceContextTextMapPropagator().inject(carrier)
        return carrier
    
    
    def set_trace_context(carrier):
        """Sets the current trace context from the given propagated trace context carrier.    
        """
        ctx = TraceContextTextMapPropagator().extract(carrier)
        context.attach(ctx)
    

    For Dagster in particular there's no builtin way that's a great tool for storing trace contexts. Resources are re-initialized for each op execution context (such as each process with the default multiprocess executor) so a resource can't create a trace context that would be shared for all ops; you would end up with may different traces. So I built something custom that stores and fetches the trace context based on the run ID (or parent run ID). Here's a snippet for walking the run ID ancestry:

    def _get_run_id_and_ancestors(context):
        """Returns a list starting with the current run's ID and then any ancestor runs.
        """
        run_id = context.run_id
        run_ids = [run_id]
        while True:
            run = context.instance.get_run_by_id(run_id)
            if run.parent_run_id:
                run_id = run.parent_run_id
                run_ids.append(run_id)
            else:
                break
        return run_ids