Search code examples
rusttraceopen-telemetryopentracing

How to properly instrument `Box<dyn Future<...>>`


I'm adding tracing to my application with quite a bit of legacy code. I've added lots of traces so far but having trouble with a few patterns. One pattern I'm having trouble with is the following


    #[tracing::instrument(skip_all)]
    pub(crate) fn blocks_matching_polling_intervals(
        &self,
        logger: Logger,
        from: i32,
        to: i32,
        filter: &EthereumBlockFilter,
    ) -> Pin<
        Box<
            dyn std::future::Future<Output = Result<Vec<EthereumTrigger>, anyhow::Error>>
                + std::marker::Send,
        >,
    > {
        // Create a HashMap of block numbers to Vec<EthereumBlockTriggerType>
        let matching_blocks = (from..=to)
            .filter_map(|block_number| {
                filter
                    .polling_intervals
                    .iter()
                    .find_map(|(start_block, interval)| {
                        let has_once_trigger = (*interval == 0) && (block_number == *start_block);
                        let has_polling_trigger = block_number >= *start_block
                            && *interval > 0
                            && ((block_number - start_block) % *interval) == 0;

                        if has_once_trigger || has_polling_trigger {
                            let mut triggers = Vec::new();
                            if has_once_trigger {
                                triggers.push(EthereumBlockTriggerType::Start);
                            }
                            if has_polling_trigger {
                                triggers.push(EthereumBlockTriggerType::End);
                            }
                            Some((block_number, triggers))
                        } else {
                            None
                        }
                    })
            })
            .collect::<HashMap<_, _>>();

        let blocks_matching_polling_filter = self.load_ptrs_for_blocks(
            logger.clone(),
            matching_blocks.iter().map(|(k, _)| *k).collect_vec(),
        );

        let block_futures = blocks_matching_polling_filter.map(move |ptrs| {
            ptrs.into_iter()
                .flat_map(|ptr| {
                    let triggers = matching_blocks
                        .get(&ptr.number)
                        // Safe to unwrap since we are iterating over ptrs which was created from
                        // the keys of matching_blocks
                        .unwrap()
                        .iter()
                        .map(move |trigger| EthereumTrigger::Block(ptr.clone(), trigger.clone()));

                    triggers
                })
                .collect::<Vec<_>>()
        });

        block_futures.compat().boxed()
    }

    #[tracing::instrument(skip_all)]
    pub(crate) fn block_range_to_ptrs(
        &self,
        logger: Logger,
        from: BlockNumber,
        to: BlockNumber,
    ) -> Box<dyn Future<Item = Vec<BlockPtr>, Error = Error> + Send> {
        // Currently we can't go to the DB for this because there might be duplicate entries for
        // the same block number.
        debug!(&logger, "Requesting hashes for blocks [{}, {}]", from, to);
        Box::new(
            self.load_block_ptrs_rpc(logger, (from..=to).collect())
                .collect(),
        )
    }

    fn load_block_ptrs_rpc(
        &self,
        logger: Logger,
        block_nums: Vec<BlockNumber>,
    ) -> impl Stream<Item = BlockPtr, Error = Error> + Send {
        let web3 = self.web3.clone();

        stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
            let span = info_span!("load_block_ptrs_rpc");
            let web3 = web3.clone();
            retry(format!("load block ptr {}", block_num), &logger)
                .no_limit()
                .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
                .run(move || {
                    let web3 = web3.clone();
                    async move {
                        let block = web3
                            .eth()
                            .block(BlockId::Number(Web3BlockNumber::Number(block_num.into())))
                            .boxed()
                            .await?;

                        block.ok_or_else(|| {
                            anyhow!("Ethereum node did not find block {:?}", block_num)
                        })
                    }
                })
                .instrument(span)
                .boxed()
                .compat()
                .from_err()
        }))
        .buffered(ENV_VARS.block_batch_size)
        .map(|b| b.into())
    }

blocks_matching_polling_intervals calls block_range_to_ptrs which then calls load_block_ptrs_rpc. The first two functions have the #instrument attribute while the third function manually manages spans. I'm finding that when I run this, the spans that are generated do not have the proper parent-child relationships.

enter image description here

In the above screenshot we can see that blocks_matching_polling_intervals and block_range_to_ptrs both complete in ~60us and that load_block_ptrs_rpc is not a child of block_range_to_ptrs as it should be.

How can I properly instrument blocks_matching_polling_intervals and block_range_to_ptrs? I've tried several variants of using the .instrument() combinator and manually creating/entering spans but none of worked well.


Solution

  • turns out I needed to add the futures-01 feature to the tracing-futures crate and then the .instrument combinator worked perfectly for block_range_to_ptrs