Search code examples
rusttraceopentracingrust-tracing

How to add traces/spans to async function?


I'm having trouble adding tracing to a particular function in my rust project. Here are the relevant files

// chain/ethereum/src/ingestor.rs

    #[tracing::instrument(skip(self), name = "PollingBlockIngestor::do_poll")]
    async fn do_poll(&self) -> Result<(), IngestorError> {

        // Get chain head ptr from store
        let head_block_ptr_opt = self.chain_store.cheap_clone().chain_head_ptr().await?;

        // To check if there is a new block or not, fetch only the block header since that's cheaper
        // than the full block. This is worthwhile because most of the time there won't be a new
        // block, as we expect the poll interval to be much shorter than the block time.
        info_span!("latest_block");
        let latest_block = self.latest_block().await?;

        if let Some(head_block) = head_block_ptr_opt.as_ref() {
            // If latest block matches head block in store, nothing needs to be done
            if &latest_block == head_block {
                return Ok(());
            }

            if latest_block.number < head_block.number {
                // An ingestor might wait or move forward, but it never
                // wavers and goes back. More seriously, this keeps us from
                // later trying to ingest a block with the same number again
                warn!(self.logger,
                    "Provider went backwards - ignoring this latest block";
                    "current_block_head" => head_block.number,
                    "latest_block_head" => latest_block.number);
                return Ok(());
            }
        }

        // Compare latest block with head ptr, alert user if far behind
        match head_block_ptr_opt {
            None => {
                info!(
                    self.logger,
                    "Downloading latest blocks from Ethereum, this may take a few minutes..."
                );
            }
            Some(head_block_ptr) => {
                let latest_number = latest_block.number;
                let head_number = head_block_ptr.number;
                let distance = latest_number - head_number;
                let blocks_needed = (distance).min(self.ancestor_count);
                let code = if distance >= 15 {
                    LogCode::BlockIngestionLagging
                } else {
                    LogCode::BlockIngestionStatus
                };
                if distance > 0 {
                    info!(
                        self.logger,
                        "Syncing {} blocks from Ethereum",
                        blocks_needed;
                        "current_block_head" => head_number,
                        "latest_block_head" => latest_number,
                        "blocks_behind" => distance,
                        "blocks_needed" => blocks_needed,
                        "code" => code,
                    );
                }
            }
        }

        let mut missing_block_hash = self.ingest_block(&latest_block.hash).await?;

        
        while let Some(hash) = missing_block_hash {
            missing_block_hash = self.ingest_block(&hash).await?;
        }
        Ok(())
    }

    async fn latest_block(&self) -> Result<BlockPtr, IngestorError> {
        info_span!("latest_block_header");
        self.eth_adapter
            .latest_block_header(&self.logger)
            .compat()
            .await
            .map(|block| block.into())
    }

and

// chain/ethereum/src/ethereum_adapter.rs

impl EthereumAdapterTrait for EthereumAdapter {

    #[tracing::instrument(skip_all, name = "testest")]
    fn latest_block_header(
        &self,
        logger: &Logger,
    ) -> Box<dyn Future<Item = web3::types::Block<H256>, Error = IngestorError> + Send> {
        let s = info_span!("latest_block_header in eth adapter");
        let web3 = self.web3.clone();
        Box::new(
            retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
                .no_limit()
                .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
                .run(move || {
                    let web3 = web3.cheap_clone();
                    async move {
                        let block_opt = web3
                            .eth()
                            .block(Web3BlockNumber::Latest.into())
                            .await
                            .map_err(|e| {
                                anyhow!("could not get latest block from Ethereum: {}", e)
                            })?;

                        block_opt
                            .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into())
                    }
                })
                .map_err(move |e| {
                    e.into_inner().unwrap_or_else(move || {
                        anyhow!("Ethereum node took too long to return latest block").into()
                    })
                })
                .boxed()
                .compat(),
        )
    }
// lots of other functions
}

When I run this and send the traces to Jaeger, I don't see either of the spans on latest_block_header. I see PollingBlockIngestor::do_poll and latest_block and latest_block_header but don't see testest nor latest_block_header in eth adapter.

enter image description here

How do I correctly create spans for the latest_block_header function?

EDIT: I also tried modifying the async move like so where I call instrument on the future but that also doesn't work.

                    let web3 = web3.cheap_clone();
                    let s = info_span!("latest_block_header in eth adapter");
                    async move {
                        let block_opt = web3
                            .eth()
                            .block(Web3BlockNumber::Latest.into())
                            .instrument(s)
                            .await
                            .map_err(|e| {
                                anyhow!("could not get latest block from Ethereum: {}", e)
                            })?;

                        block_opt
                            .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into())
                    }

Solution

  • Turns out I had my filters set up incorrectly. They were

            let filter_layer = EnvFilter::try_new(
                "graph_chain_ethereum::ingestor,reqwest_tracing::reqwest_otel_span_builder",
            )
    

    I needed to add graph_chain_ethereum::ethereum_adapter to the list and then everything worked flawlessly.