Search code examples
haskellclouddistributed-computingcloud-haskell

Cloud Haskell hanging forever when sending messages to ManagedProcess


The Problem

Hello! I'm writing in Cloud Haskell a simple Server - Worker program. The problem is, that when I try to create ManagedProcess, after the server disovery step, my example hangs forever even while using callTimeout (which should break after 100 ms). The code is very simple, but I cannot find anything wrong with it.

I've posted the question on the mailing list also, but as far as I know the SO community, I canget the answer a lot faster here. If I get the answer from mailing list, I will postit here also.

Source Code

The Worker.hs:

{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric             #-}
{-# LANGUAGE TemplateHaskell           #-}

module Main where

import Network.Transport     (EndPointAddress(EndPointAddress))
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer (sleep)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process.Node hiding (call)
import Control.Concurrent (threadDelay)
import GHC.Generics (Generic)
import Data.Binary (Binary) 
import Data.Typeable (Typeable)
import Data.ByteString.Char8 (pack)
import System.Environment    (getArgs)

import qualified Server as Server

main = do
  [host, port, serverAddr] <- getArgs

  Right transport <- createTransport host port defaultTCPParameters
  node <- newLocalNode transport initRemoteTable

  let addr = EndPointAddress (pack serverAddr)
      srvID = NodeId addr

  _ <- forkProcess node $ do
    sid <- discoverServer srvID
    liftIO $ putStrLn "x"
    liftIO $ print sid
    r <- callTimeout sid (Server.Add 5 6) 100 :: Process (Maybe Double)
    liftIO $ putStrLn "x"
    liftIO $ threadDelay (10 * 1000 * 1000)


  threadDelay (10 * 1000 * 1000)
  return ()


discoverServer srvID = do
  whereisRemoteAsync srvID "serverPID"
  reply <- expectTimeout 100 :: Process (Maybe WhereIsReply)
  case reply of
    Just (WhereIsReply _ msid) -> case msid of
      Just sid -> return sid
      Nothing  -> discoverServer srvID
    Nothing                    -> discoverServer srvID

The Server.hs:

{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric             #-}
{-# LANGUAGE TemplateHaskell           #-}

module Server where

import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer (sleep)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process.Node hiding (call)
import Control.Concurrent (threadDelay)
import GHC.Generics (Generic)
import Data.Binary (Binary) 
import Data.Typeable (Typeable)


data Add = Add Double Double
  deriving (Typeable, Generic)
instance Binary Add

launchServer :: Process ProcessId
launchServer = spawnLocal $ serve () (statelessInit Infinity) server >> return () where
  server = statelessProcess { apiHandlers            = [ handleCall_ (\(Add x y) -> liftIO (putStrLn "!") >> return (x + y)) ]
                            , unhandledMessagePolicy = Drop
                            }


main = do
  Right transport <- createTransport "127.0.0.1" "8080" defaultTCPParameters
  node <- newLocalNode transport initRemoteTable
  _ <- forkProcess node $ do
    self <- getSelfPid
    register "serverPID" self

    liftIO $ putStrLn "x"
    mid <- launchServer
    liftIO $ putStrLn "y"
    r <- call mid (Add 5 6) :: Process Double
    liftIO $ print r
    liftIO $ putStrLn "z"
    liftIO $ threadDelay (10 * 1000 * 1000)
    liftIO $ putStrLn "z2"

  threadDelay (10 * 1000 * 1000)
  return ()

We can run them as follow:

runhaskell Server.hs
runhaskell Worker.hs 127.0.0.2 8080 127.0.0.1:8080:0

The Results

When we run the programs, we got following results:

from Server:

x
y
!
11.0 -- this one shows that inside the same process we were able to use the "call" function
z
-- waiting - all the output above were tests from inside the server now it waits for external messages

from Worker:

x
pid://127.0.0.1:8080:0:10 -- this is the process id of the server optained with whereisRemoteAsync 
-- waiting forever on the "callTimeout sid (Server.Add 5 6) 100" code!

As a sidenote - I've found out that, when sending messages with send (from Control.Distributed.Process) and reciving them with expect works. But sending them with call (from Control.Distributed.Process.Platform) and trying to recive them with ManagedProcess api handlers - hangs the call forever (even using callTimeout!)


Solution

  • Your client is getting an exception, which you are not able to observe easily because you are running your client in a forkProcess. If you want to do that it is fine but then you need to monitor or link to that process. In this case, simply using runProcess would be much simpler. If you do that, you will see you get this exception:

    Worker.hs: trying to call fromInteger for a TimeInterval. Cannot guess units
    

    callTimeout does not take an Integer, it takes a TimeInterval which are constructed with the functions in the Time module. This is a pseudo-Num - it does not actually support fromInteger it seems. I would consider that a bug or at least bad form (in Haskell) but in any case the way to fix your code is simply

    r <- callTimeout sid (Server.Add 5 6) (milliSeconds 100) :: Process (Maybe Double)
    

    To fix the problem with the client calling into the server, you need to register the pid of the server process you spawned rather than the main process you spawn it from - i.e. change

    self <- getSelfPid
    register "serverPID" self
    
    liftIO $ putStrLn "x"
    mid <- launchServer
    liftIO $ putStrLn "y"
    

    to

    mid <- launchServer
    register "serverPID" mid
    liftIO $ putStrLn "y"