Search code examples
rustasync-awaitfuturedroppinning

Implementing Drop for a Future in Rust


I have an async function (let's call is my_async_fn) that should support cancellation (I want to allow the caller to use tokio::time::timeout or futures::future::select with my_async_fn). I also need to perform some cleanup, regardless of whether my_async_fn was cancelled or not (I need to remove some contents in a file that my_async_fn writes).

So my approach was to write a future that forwards polling to an inner future and implements Drop by calling a provided FnMut:

trait OnDropFutureExt
where
    Self: Future + Sized,
{
    fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D>;
}
impl<F: Future> OnDropFutureExt for F {
    fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D> {
        OnDropFuture {
            inner: self,
            on_drop,
        }
    }
}


struct OnDropFuture<F: Future, D: FnMut()> {
    inner: F,
    on_drop: D,
}
impl<F: Future, D: FnMut()> OnDropFuture<F, D> {
    // See: https://doc.rust-lang.org/std/pin/#pinning-is-structural-for-field
    fn get_mut_inner(self: Pin<&mut Self>) -> Pin<&mut F> {
        unsafe { self.map_unchecked_mut(|s| &mut s.inner) }
    }

    // See: https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field
    fn get_mut_on_drop(self: Pin<&mut Self>) -> &mut D {
        unsafe { &mut self.get_unchecked_mut().on_drop }
    }
}
impl<F: Future, D: FnMut()> Future for OnDropFuture<F, D> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
        self.get_mut_inner().poll(cx)
    }
}
impl<F: Future, D: FnMut()> Drop for OnDropFuture<F, D> {
    fn drop(&mut self) {
        // See: https://doc.rust-lang.org/std/pin/#drop-implementation
        inner_drop(unsafe { Pin::new_unchecked(self) });
        fn inner_drop<F: Future, D: FnMut()>(this: Pin<&mut OnDropFuture<F, D>>) {
            this.get_mut_on_drop()();
        }
    }
}

This allows me to call my_async_fn like this:

my_async_fn().on_drop(|| clean_up_file(&file_path)).await?;

Now I have a few questions:

  1. Since I can't propagate an error from clean_up_file I guess the best I can do is to just log if I failed to clean up the file and continue. Or is there a better option?
  2. In https://doc.rust-lang.org/std/pin/#pinning-is-structural-for-field it sais:

You must make sure that you uphold the Drop guarantee: once your struct is pinned, the memory that contains the content is not overwritten or deallocated without calling the content’s destructors. This can be tricky, as witnessed by VecDeque: the destructor of VecDeque can fail to call drop on all elements if one of the destructors panics. This violates the Drop guarantee, because it can lead to elements being deallocated without their destructor being called. (VecDeque has no pinning projections, so this does not cause unsoundness.)

My future does have a pinning projection for the inner future, so do I have to make sure that on_drop never panics? Should I add something like std::panic::catch_unwind? How would I do that, since mutable references are not UnwindSafe?

  1. Since drop is a sync function I would implement clean_up_file as a sync function as well. It shouldn't block to long to clean up the file, so that shouldn't be a problem, right? Or is it better to use futures::executor::block_on? Would that not block the current thread and allow other tasks to run while waiting for the file to get cleaned up?

  2. Ideally I would want on_drop to be a FnOnce(), since it is only called once anyways. Is it safe to change the type of on_drop to ManuallyDrop<D> and implement Drop like this:

impl<F: Future, D: FnOnce()> Drop for OnDropFuture<F, D> {
    fn drop(&mut self) {
        // See: https://doc.rust-lang.org/std/pin/#drop-implementation
        inner_drop(unsafe { Pin::new_unchecked(self) });
        fn inner_drop<F: Future, D: FnOnce()>(this: Pin<&mut OnDropFuture<F, D>>) {
            let on_drop = unsafe { ManuallyDrop::take(this.get_mut_on_drop()) };
            on_drop()
        }
    }
}

According to https://stackoverflow.com/a/74914046/4149050 this should be safe right?


Solution

  • Since I can't propagate an error from clean_up_file I guess the best I can do is to just log if I failed to clean up the file and continue. Or is there a better option?

    Yes. Errors on Drop are problematic. For the error case, this is all you can do (if you don't want to panic). For the success case, provide a fn cleanup(self) method on the future that returns Result and should be called manually so you can handle the error (it should prevent running Drop of course).

    My future does have a pinning projection for the inner future, so do I have to make sure that on_drop never panics? Should I add something like std::panic::catch_unwind?

    No. VecDeque drops all elements itself, so an error can prevent it from dropping elements. But you are using the drop glue generated by the compiler. If on_drop will panic, the compiler will still run the drop glue for inner. Even if on_drop would be declared on the struct before inner, causing it to drop first, panicking on it then panicking on its destructor (a double panic) would not be a problem, as it would cause a hard abort.

    Since drop is a sync function I would implement clean_up_file as a sync function as well. It shouldn't block to long to clean up the file, so that shouldn't be a problem, right? Or is it better to use futures::executor::block_on? Would that not block the current thread and allow other tasks to run while waiting for the file to get cleaned up?

    Ideally, it would be an async function, but async drop is still not available (and it is questionable whether it ever will), so it must be sync.

    Ideally I would want on_drop to be a FnOnce(), since it is only called once anyways. Is it safe to change the type of on_drop to ManuallyDrop<D> and implement Drop like this:

    Yes.