Search code examples
performancehaskellvectorfusion

How to insert a value into a sorted Vector in a single pass?


I have a Vector of sorted values, for example

fromList [1, 2, 4, 5]

Now I'd like to insert another value, let's say 3 and create a new vector. In an imperative language I'd allocate an array of size 5, loop over the original vector, copy the old values, and insert the new one at the proper point, so that I obtain

fromList [1, 2, 3, 4, 5]

I can do it using vector API as

let (pre, post) = span (< x) n
in V.concat [pre, pure x, post]

which works, but traverses the original vector twice: Once when searching for the split and once when combining it. Is there a way how to do it in just one pass? (Another solution would be to search for the splitting point using a binary search, but I'm interested if it's possible to create a genuine single-pass solution.)


Solution

  • user5402's answer is quite a pretty way to do it, but it falls prey to an efficiency problem described in the Data.Vector documentation. Specifically, once it has found the insertion spot, and is copying blindly, it no longer forces the values to actually be read from the source vector. Instead, it fills up the destination vector with thunks that, when forced, read from the source vector.

    Original solution

    Note: This is the first solution I came up with. It is pretty easy to understand, but it does not play well with the stream fusion system in vector, which can lead to relatively poor performance. The solutions below are better in general.

    One solution, as explained in the documentation, is to use the monadic indexM operation to perform these blind reads. This forces the read to be performed, but does not force the read value. Thus it copies a pointer (possibly a pointer to a thunk) from the source vector to the destination vector. For greatest efficiency, everything below should be replaced with its unsafe variant (unsafeIndexM, unsafeIndex, and unsafeWrite in particular).

    {-# Language ScopedTypeVariables #-}
    module Insert where
    import qualified Data.Vector as V
    import Data.Vector (Vector)
    import qualified Data.Vector.Mutable as MV
    import Data.Vector.Mutable (MVector)
    import Control.Monad.ST
    
    insertElem :: forall a . Ord a => a -> Vector a -> Vector a
    insertElem e v = V.create act
     where
      act :: forall s . ST s (MVector s a)
      act = do
        mv <- MV.new (V.length v + 1)
        let
          start :: Int -> ST s (MVector s a)
          start i
            | i == V.length v ||
              e <= v V.! i = MV.write mv i e >> finish i
            | otherwise = MV.write mv i (v V.! i) >> start (i+1)
          finish :: Int -> ST s (MVector s a)
          finish i
            | i == V.length v = return mv
            | otherwise = do
                 V.indexM v i >>= MV.write mv (i+1)
                 finish (i+1)
          in start 0
    
    insertElemInt :: Int -> Vector Int -> Vector Int
    insertElemInt = insertElem
    

    Note that naming the act action and using ScopedTypeVariables should not actually be necessary, but I found them tremendously helpful in tracking down my mistakes.

    Stream-based solutions

    The above code won't work well with stream fusion, because indices fly all over the place. The following approach should fuse properly, and still avoid creating unnecessary thunks. This is the first time I've ever touched stream fusion code, so it may be that some things could be improved. The only problem with this first stream-based version is that if it does fuse, then the step function for the input stream will be run twice on one of the elements. This is normally not a problem, but if the step function is extremely expensive (rare), it could be. I therefore give an alternative that should work better in that case. I'm not sure just when which of these will be better in practice, so I'm including both.

    With either of these stream-based solutions, the code

    testEnum :: Word -> Word -> Word -> Word
    testEnum ins low high = V.product $
      insertElem ins $ V.enumFromStepN low 1 (fromIntegral high)
    

    will compile into loops that run in constant space, never actually creating a vector at all.

    Step one seed twice

    {-# Language ScopedTypeVariables #-}
    module Insert where
    
    import Data.Vector (Vector)
    import Data.Word (Word)
    import qualified Data.Vector.Fusion.Stream.Monadic as S
    import qualified Data.Vector.Generic as G
    import Data.Vector.Fusion.Util (Id(..))
    
    -- To check on unboxing and such
    insertElemWord :: Word -> Vector Word -> Vector Word
    insertElemWord = insertElem
    
    {-# INLINE insertElem #-}
    insertElem :: forall a . Ord a => a -> Vector a -> Vector a
    insertElem a v = G.unstream (insertElemS a (G.stream v))
    
    {-# INLINE [1] insertElemS #-}
    insertElemS :: forall a . Ord a => a -> S.Stream Id a -> S.Stream Id a
    insertElemS e (S.Stream step (state::s) size) = S.Stream step' (state, False) (size + 1)
      where
        {-# INLINE [0] step' #-}
        step' :: (s, Bool) -> Id (S.Step (s, Bool) a)
        step' (s, True) = Id $ case unId (step s) of
           S.Yield a s' -> S.Yield a (s', True)
           S.Skip s' -> S.Skip (s', True)
           S.Done -> S.Done
        step' (s, False) = Id $ case unId (step s) of
           S.Yield a s' ->
              if e <= a
              then S.Yield e (s, True)
              else S.Yield a (s', False)
           S.Skip s' -> S.Skip (s', False)
           S.Done -> S.Yield e (s, True)
    

    One pass exactly, never repeating a lookup/step

    {-# Language ScopedTypeVariables #-}
    module Insert where
    
    import Data.Vector (Vector)
    import Data.Word (Word)
    import qualified Data.Vector.Fusion.Stream.Monadic as S
    import qualified Data.Vector.Generic as G
    import Data.Vector.Fusion.Util (Id(..))
    
    data Status s a = Pre s | During s a | Post s | End
    
    insertElemWord :: Word -> Vector Word -> Vector Word
    insertElemWord = insertElem
    
    {-# INLINE insertElem #-}
    insertElem :: forall a . Ord a => a -> Vector a -> Vector a
    insertElem a v = G.unstream (insertElemS a (G.stream v))
    
    {-# INLINE [1] insertElemS #-}
    insertElemS :: forall a . Ord a => a -> S.Stream Id a -> S.Stream Id a
    insertElemS e (S.Stream step (state::s) size) = S.Stream step' (Pre state) (size+1)
      where
        {-# INLINE [0] step' #-}
        step' :: Status s a -> Id (S.Step (Status s a) a)
        step' (Post s) = Id $ case unId (step s) of
          S.Yield a s' -> S.Yield a (Post s')
          S.Skip s' -> S.Skip (Post s')
          S.Done -> S.Done
        step' (Pre s) = Id $ case unId (step s) of
          S.Yield a s'
            | e <= a    -> S.Yield e (During s' a)
            | otherwise -> S.Yield a (Pre s')
          S.Skip s' -> S.Skip (Pre s')
          S.Done -> S.Yield e End
        step' (During s a) = Id (S.Yield a (Post s))
        step' End = Id S.Done