Search code examples
haskellffiside-effects

Multiple FFI calls to a Haskell function with side-effects


I want to write a Haskell function that receives a vector and returns another one with the same size but delayed by a certain number of samples d (add zeroes at the start). Because the produced vector must have the same size, it must buffer the remaining samples so the next application of this function from C continues on with the previously buffered samples. To do so I implemented a ring-buffer concept and hooked it up to C through FFI.

delay :: Int -> V.Vector Float -> V.Vector Float
delay d inp = unsafePerformIO $ do
    ini <- MV.replicate d 0.0
    buf <- newIORef ini
    inx <- newIORef 0
    let f v = unsafePerformIO $ do 
        i <- readIORef inx
        b <- readIORef buf
        r <- MV.unsafeExchange b i v
        writeIORef buf b
        writeIORef inx ((i+1) `mod` d)
        return r
    return $ V.map f inp

Note here there are two types of vectors, Data.StorableVectoras V (not to be confused with Data.Vector.Storable), and Data.Vector.Storable.Mutable as MV for the ring-buffer.

type Process = (V.Vector Float -> V.Vector Float)

foreign export ccall 
startCtx :: IO(StablePtr Process)
startCtx = newStablePtr $ delay 2

foreign export ccall 
freeCtx :: StablePtr Process -> IO()
freeCtx = freeStablePtr

foreign export ccall 
hs_process :: StablePtr Process -> Int -> Ptr Float -> Ptr Float -> IO()
hs_process pf ns i o = do
    f <- deRefStablePtr pf
    iv <- V.peek ns i
    V.poke o $ f iv

On the C side:

#include "Process_stub.h"
#include <vector>

using namespace std;

extern "C" {
    void HsStart();
    void HsEnd();
}

vector<float> input1 = {1.0, 2.0, 3.0, 4.0, 5.0},
              input2 = {6.0, 7.0, 8.0, 9.0, 10.0},
              output(input1.size(), 0.0);

int main(int argc, char *argv[])
{   
    HsStart();

    auto pf = startCtx();

    hs_process(pf, input1.size(), input1.data(), output.data());

    for(int i = 0; i < input1.size(); i++)
        printf("[%d] output = %f\n", i, output[i]);

    hs_process(pf, input2.size(), input2.data(), output.data());

    for(int i = 0; i < input2.size(); i++)
        printf("[%d] output = %f\n", i, output[i]);

    freeCtx(pf);
    
    HsEnd();

    return 0;
}

What I expect:

First call of hs_process:
[0] input = 1    |    output = 0
[1] input = 2    |    output = 0
[2] input = 3    |    output = 1
[3] input = 4    |    output = 2
[4] input = 5    |    output = 3

Second call of hs_process:
[0] input = 6    |    output = 4
[1] input = 7    |    output = 5
[2] input = 8    |    output = 6
[3] input = 9    |    output = 7
[4] input = 10   |    output = 8

But what I get instead:

First call of hs_process:
[0] input = 1    |    output = 0
[1] input = 2    |    output = 0
[2] input = 3    |    output = 1
[3] input = 4    |    output = 2
[4] input = 5    |    output = 3

Second call of hs_process:
[0] input = 6    |    output = 0
[1] input = 7    |    output = 0
[2] input = 8    |    output = 6
[3] input = 9    |    output = 7
[4] input = 10   |    output = 8

I can see what I'm doing wrong although I can't explain properly. I'm just keeping the function, not the applicative per se. I would like to be able to keep every (possibly chained) delay call alive by some closure in the StablePtr.


Solution

  • If this is for production code, it makes more sense to write your delay function in C++.

    If this is just to figure out how to do it in Haskell, then understand that this sort of thing must be done in the IO monad and NOT using unsafe operations. (The reason the operations are unsafe is because using them will result in exactly the sort of weird behavior you're seeing. Function closures won't help here.)

    The simplest way to implement delay in Haskell is to add a parameter to the function representing the "pipe" that's being used. This can be an opaque value to the C program, but on the Haskell side, it can be some direct representation of the sequence of pending, delayed elements.

    There's no real reason to use a ring buffer here. You just need a buffer for the "extra" elements, which we can represent as a storable array via Foreign.Array:

    data Pipe = Pipe Int (Ptr Float)
    
    c_new_delay :: Int -> IO (StablePtr Pipe)
    c_new_delay d = do
      pipe <- Pipe d <$> newArray (replicate d 0)
      newStablePtr pipe
    

    Note that allocating a new delay pipe is an IO operation, so that two invocations of c_new_delay with the same delay size: c_new_delay 2 followed by c_new_delay 2 can return two different pipes. We can run input through a delay pipe using a function that takes the pipe as its first argument. This, too, needs to be an IO operation, rather than a pure operation. Otherwise, it wouldn't be possible to pass the same pipe and input to the function in two consecutive calls and generate different output each time.

    c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    c_run_delay pipe n input output = do
      Pipe d delayed <- deRefStablePtr pipe
      copyArray output delayed d
      copyArray (advancePtr output d) input (n-d)
      copyArray delayed (advancePtr input (n-d)) d
    

    Above, I've just used the copyArray primitive to copy slices between the input, array of delayed elements, and output. With a final function to free a pipe:

    c_free_delay :: StablePtr Pipe -> IO ()
    c_free_delay pipe = do
      Pipe _ delayed <- deRefStablePtr pipe
      free delayed
      freeStablePtr pipe
    

    this is more or less a plug-in replacement for your C++ interface. The full code is:

    -- Delay.hs
    
    {-# LANGUAGE ForeignFunctionInterface #-}
    
    module Delay where
    
    import Foreign.Ptr
    import Foreign.StablePtr
    import Foreign.Marshal.Array
    import Foreign.Marshal.Alloc
    
    data Pipe = Pipe Int (Ptr Float)
    
    -- |Create a new delay pipe of given size, internally represented
    -- as a malloced C-compatible array of delayed items.
    c_new_delay :: Int -> IO (StablePtr Pipe)
    c_new_delay d = do
      pipe <- Pipe d <$> newArray (replicate d 0)
      newStablePtr pipe
    
    -- |Feed input to the delay pipe.  NOTE: input and output buffers
    -- may not overlap.
    c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    c_run_delay pipe n input output = do
      Pipe d delayed <- deRefStablePtr pipe
      copyArray output delayed d
      copyArray (advancePtr output d) input (n-d)
      copyArray delayed (advancePtr input (n-d)) d
    
    -- |Free a delay pipe.
    c_free_delay :: StablePtr Pipe -> IO ()
    c_free_delay pipe = do
      Pipe _ delayed <- deRefStablePtr pipe
      free delayed
      freeStablePtr pipe
    
    foreign export ccall "new_delay" c_new_delay :: Int -> IO (StablePtr Pipe)
    foreign export ccall "run_delay" c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    foreign export ccall "free_delay" c_free_delay :: StablePtr Pipe -> IO ()
    
    /* delay.cc */
    
    #include "Delay_stub.h"
    #include <vector>
    #include <cstdio>
    
    using namespace std;
    
    vector<float> input1 = {1.0, 2.0, 3.0, 4.0, 5.0},
                  input2 = {6.0, 7.0, 8.0, 9.0, 10.0},
                  output(input1.size(), 0.0);
    
    int main(int argc, char *argv[])
    {
        hs_init(&argc, &argv);
    
        auto pf = new_delay(2);
    
        run_delay(pf, input1.size(), input1.data(), output.data());
    
        for(int i = 0; i < input1.size(); i++)
            printf("[%d] output = %f\n", i, output[i]);
    
        run_delay(pf, input2.size(), input2.data(), output.data());
    
        for(int i = 0; i < input2.size(); i++)
            printf("[%d] output = %f\n", i, output[i]);
    
        free_delay(pf);
    
        return 0;
    }
    

    and compiling and running with:

    $ ghc -no-hs-main -o delay Delay.hs delay.cc && ./delay
    

    produces the expected output:

    [1 of 2] Compiling Delay            ( Delay.hs, Delay.o ) [Source file changed]
    [2 of 2] Linking delay [Objects changed]
    [0] output = 0.000000
    [1] output = 0.000000
    [2] output = 1.000000
    [3] output = 2.000000
    [4] output = 3.000000
    [0] output = 4.000000
    [1] output = 5.000000
    [2] output = 6.000000
    [3] output = 7.000000
    [4] output = 8.000000
    

    Note that if you had an existing Haskell delay function meant to be called from Haskell code, it still wouldn't be written as a pure function. Instead, you'd use something like @chi's suggestion in the IO monad:

    delay :: Int -> IO (Vector Float -> IO (Vector Float))
    

    The justification here is that delay 2 can't return a pure value, or else the code:

    pipe1 = delay 2
    pipe2 = delay 2
    

    would end up defining pipe1 and pipe2 as the same pipe, and you want these to be independent pipes. By making delay 2 return an IO value, you can write some do-notation:

    pipe1 <- delay 2
    pipe2 <- delay 2
    

    and get the independent pipes you expect. Similarly, pipe1 itself can't be a pure Vector Float -> Vector Float function, or else running it twice on the same input:

    pipe1 [1,2,3,4,5]
    pipe1 [1,2,3,4,5]
    

    would produce the same result, and you want different results (first [0,0,1,2,3], and then [4,5,1,2,3]). So, again, by having it return an IO value, you can write:

    result1 <- pipe1 [1,2,3,4,5]
    result2 <- pipe1 [1,2,3,4,5]
    

    and reasonably expect to get two different results.

    Anyway, if you had an existing Haskell implementation of this form, like the following which uses an IORef to hold an immutable array to the delayed elements between calls:

    {-# LANGUAGE OverloadedLists #-}
    
    module Delay where
    
    import Data.IORef
    import qualified Data.Vector.Unboxed as V
    
    delay :: Int -> IO (V.Vector Float -> IO (V.Vector Float))
    delay n = do
      r <- newIORef (V.replicate n 0)
      pure $ \v -> atomicModifyIORef r $ \v_delayed ->
        let (v1, v2) = V.splitAt (V.length v - n) v
        in  (v2, v_delayed V.++ v1)
    
    test :: IO ()
    test = do
      d <- delay 2
      print =<< d [1,2,3,4,5]  -- output: [0.0,0.0,1.0,2.0,3.0]
      print =<< d [1,2,3,4,5]  -- ouptut: [4.0,5.0,1.0,2.0,3.0]
    

    and wanted to expose a C API, you could use a StablePtr to the Vector Float -> IO (Vector Float) portion, similar to what you already tried. Since I'm using Vector.Unboxed, I used slightly different code to marshal the vectors into and out of the C++ buffers.

    import Foreign.Ptr
    import Foreign.StablePtr
    import Foreign.Storable
    
    type Pipe = V.Vector Float -> IO (V.Vector Float)
    
    c_new_delay :: Int -> IO (StablePtr Pipe)
    c_new_delay d = delay d >>= newStablePtr
    
    c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    c_run_delay pipe n input output = do
      pipe' <- deRefStablePtr pipe
      input' <- V.generateM n (peekElemOff input)
      output' <- pipe' input'
      V.imapM_ (pokeElemOff output) output'
    
    c_free_delay :: StablePtr Pipe -> IO ()
    c_free_delay = freeStablePtr
    
    foreign export ccall "new_delay" c_new_delay :: Int -> IO (StablePtr Pipe)
    foreign export ccall "run_delay" c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    foreign export ccall "free_delay" c_free_delay :: StablePtr Pipe -> IO ()
    

    This could be substituted for the Delay.hs module above and would work fine with my posted delay.cc. The full alternative Delay.hs module is:

    -- Delay.hs, alternative version
    
    {-# LANGUAGE OverloadedLists #-}
    
    module Delay where
    
    import Data.IORef
    import qualified Data.Vector.Unboxed as V
    import Foreign.Ptr
    import Foreign.StablePtr
    import Foreign.Storable
    
    delay :: Int -> IO (V.Vector Float -> IO (V.Vector Float))
    delay n = do
      r <- newIORef (V.replicate n 0)
      pure $ \v -> atomicModifyIORef r $ \v_delayed ->
        let (v1, v2) = V.splitAt (V.length v - n) v
        in  (v2, v_delayed V.++ v1)
    
    test :: IO ()
    test = do
      d <- delay 2
      print =<< d [1,2,3,4,5]  -- output: [0.0,0.0,1.0,2.0,3.0]
      print =<< d [1,2,3,4,5]  -- ouptut: [4.0,5.0,1.0,2.0,3.0]
    
    type Pipe = V.Vector Float -> IO (V.Vector Float)
    
    c_new_delay :: Int -> IO (StablePtr Pipe)
    c_new_delay d = delay d >>= newStablePtr
    
    c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    c_run_delay pipe n input output = do
      pipe' <- deRefStablePtr pipe
      input' <- V.generateM n (peekElemOff input)
      output' <- pipe' input'
      V.imapM_ (pokeElemOff output) output'
    
    c_free_delay :: StablePtr Pipe -> IO ()
    c_free_delay = freeStablePtr
    
    foreign export ccall "new_delay" c_new_delay :: Int -> IO (StablePtr Pipe)
    foreign export ccall "run_delay" c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
    foreign export ccall "free_delay" c_free_delay :: StablePtr Pipe -> IO ()