Search code examples
haskellmemory-leakscpu-usagestream-fusion

Ever increasing CPU consumption with Haskell and stream-fusion


Here is a short Haskell program that generates a 440 Hz sound. It uses pulseaudio as an audio backend.

import GHC.Float
import Control.Arrow
import Sound.Pulse.Simple
import qualified Data.List.Stream as S
import Data.List

type Time = Double
type Frequency = Double
type Sample = Double
type CV = Double

chunksize = 441 * 2
sampleRate :: (Fractional a) => a
sampleRate = 44100

integral :: [Double] -> [Double]
integral = scanl1  (\acc x -> acc + x / sampleRate)

chunks :: Int -> [a] -> [[a]]
chunks n = S.takeWhile (not . S.null) . S.unfoldr (Just . S.splitAt n)

pulseaudioOutput :: [Sample] -> IO ()
pulseaudioOutput sx = do

    pa <- simpleNew Nothing "Synths" Play Nothing "Synths PCM output"
         (SampleSpec (F32 LittleEndian) 44100 1) Nothing Nothing

    mapM_ (simpleWrite pa . S.map double2Float) $ chunks 1000 sx

    simpleDrain pa
    simpleFree pa

oscSine :: Frequency -> [CV] ->  [Sample]
oscSine f = S.map sin <<< integral <<< S.map ((2 * pi * f *) . (2**))

music ::[Sample]
music = oscSine 440 (S.repeat 0)

main = do
    pulseaudioOutput music

If I compile and run this, I see an ever growing CPU consumption.

If I change "S.splitAt" to "splitAt" in the definition of "chunks", everything is fine.

Can anyone guess why this can be?

Thank you.

Update

In the following code all three version of chunks can produce the aforementioned behaviour:

import GHC.Float
import Control.Arrow
import Sound.Pulse.Simple
import Data.List.Stream

import Prelude hiding ( unfoldr
                      , map
                      , null
                      , scanl1
                      , takeWhile
                      , repeat
                      , splitAt
                      , drop
                      , take
                      )

type Time = Double
type Frequency = Double
type Sample = Double
type CV = Double

chunksize = 441 * 2
sampleRate :: (Fractional a) => a
sampleRate = 44100

integral :: [Double] -> [Double]
integral = scanl1  (\acc x -> acc + x / sampleRate)

chunks :: Int -> [a] -> [[a]]
--chunks n = takeWhile (not . null) . unfoldr (Just . splitAt n)
--chunks n xs = take n xs : chunks n (drop n xs)
chunks n xs = h : chunks n t
    where
        (h, t) = splitAt n xs

pulseaudioOutput :: [Sample] -> IO ()
pulseaudioOutput sx = do

    pa <- simpleNew Nothing "Synths" Play Nothing "Synths PCM output"
         (SampleSpec (F32 LittleEndian) 44100 1) Nothing Nothing

    mapM_ (simpleWrite pa . map double2Float) $ chunks 1000 sx

    simpleDrain pa
    simpleFree pa

oscSine :: Frequency -> [CV] ->  [Sample]
oscSine f = map sin <<< integral <<< map ((2 * pi * f *) . (2**))

music ::[Sample]
music = oscSine 440 (repeat 0)

main = do
    pulseaudioOutput music

I cleaned up the code to avoid mixing plain old lists and stream-fusion lists. The memory / cpu leak is still there. To see that the code is working on old lists, just remove the Prelude import and ".Stream" after "Data.List".


Solution

  • The splitAt on streams that is substituted by the fusion rules (http://hackage.haskell.org/package/stream-fusion-0.1.2.5/docs/Data-Stream.html#g:12) has the following signature:

    splitAt :: Int -> Stream a -> ([a], [a])
    

    From this we can see that since it produces lists and not streams, that obstructs further fusion. The correct thing to do, I think, is to produce either a splitAt that generates streams, or better yet to write a chunks function directly on streams with the appropriate fusion rules from the list version.

    Here is a splitAt on streams that I think should be good. You would of course need to pair it with the appropriate rewrite rules from a splitAt on lists, and if those rewrite rules get tricky, perhaps write the chunks function directly, though it seems a bit tricky to do so as well:

    splitAt :: Int -> Stream a -> (Stream a, Stream a)
    splitAt n0 (Stream next s0)
      | n0 < 0    = (nilStream, (Stream next s0))
      | otherwise = loop_splitAt n0 s0
      where
        nilStream = Stream (const Done) s0
        loop_splitAt  0 !s = (nilStream, (Stream next s))
        loop_splitAt !n !s = case next s of
          Done            -> (nilStream, nilStream)
          Skip    s'      -> loop_splitAt n s'
          Yield x s'      -> (cons x xs', xs'')
            where
              (xs', xs'') = loop_splitAt (n-1) s'