Search code examples
c#concurrencyimmutabilityactormessage-passing

How can I get around my "object reference" issue when passing messages to an actor?


A while back I put together a simple class named Actor that was my implementation of the Actor Model. Since then I've used it with great success (Minus some annoying workarounds for the lack of a discriminated union type.). I am left with an issue that I am unsure of how to resolve without making the class clunky and slow.

When someone defines a message they are of course within their right to include a reference to an object that the caller themselves can manipulate. Even with the knowledge that I will pretty much be the only person using this class, this still bothers me.

A good example of a way around this is with Web Workers as implemented in Firefox. When passing an object to a worker it is serialized to JSON.

Any ideas?

public abstract class Actor<T, U> : IDisposable
{
    private const int AsyncChannelPoolSize = 20;

    private volatile bool _disposed;
    private readonly Thread _actorThread;
    private readonly AsyncReplyChannel<T, U> _messageChannel;
    private readonly Lazy<ObjectPool<AsyncChannel<U>>> _asyncChannelPool;


    public event EventHandler<ExceptionEventArgs> Exception;


    protected Actor()
    {
        _messageChannel = new AsyncReplyChannel<T, U>();
        _asyncChannelPool = new Lazy<ObjectPool<AsyncChannel<U>>>(() => new ObjectPool<AsyncChannel<U>>(AsyncChannelPoolSize));
        _actorThread = new Thread(ProcessMessages);
        _actorThread.IsBackground = true;
        _actorThread.Start();
    }


    public U PostWithReply(T value)
    {
        ThrowIfDisposed();

        var replyChannel = default(AsyncChannel<U>);
        var replyPackage = default(AsyncReplyPackage<T, U>);
        var replyMessage = default(U);

        try
        {
            replyChannel = _asyncChannelPool.Value.Get();
            replyPackage = new AsyncReplyPackage<T, U>(value, replyChannel);
            _messageChannel.Send(replyPackage);
            replyMessage = replyChannel.Receive();
        }
        finally
        {
            _asyncChannelPool.Value.Put(replyChannel);
        }

        return replyMessage;
    }

    public void PostWithAsyncReply(T value, IAsyncChannel<U> replyChannel)
    {
        ThrowIfDisposed();
        _messageChannel.Send(new AsyncReplyPackage<T, U>(value, replyChannel));
    }

    public void Dispose()
    {
        Dispose(true);
    }

    protected abstract void ProcessMessage(AsyncReplyPackage<T, U> package);

    protected virtual void OnException(Exception ex)
    {
        var exceptionEvent = Exception;
        if (exceptionEvent != null)
        {
            exceptionEvent(this, new ExceptionEventArgs(ex));
        }
    }

    protected virtual void Dispose(bool disposing)
    {
        _disposed = true;
        _messageChannel.Dispose();
        if (_asyncChannelPool.IsValueCreated)
        {
            _asyncChannelPool.Value.Dispose();
        }
    }

    private void ProcessMessages()
    {
        var package = default(AsyncReplyPackage<T, U>);
        while (_messageChannel.TryReceive(out package) && !_disposed)
        {
            try
            {
                ProcessMessage(package);
            }
            catch (Exception ex)
            {
                OnException(ex);
            }
        }
    }

    private void ThrowIfDisposed()
    {
        if (_disposed)
        {
            throw new ObjectDisposedException(GetType().FullName);
        }
    }
}

Solution

  • Not that this is a good solution, but you could restrict T to be ICloneable and clone the object when you get it. That's more or less the ideological equivalent of the "serialize to JSON" approach you described. Needless to say, this would be clunky and slow.

    Really, you should just remember to keep messages immutable. C# isn't a language that can enforce this very well for you.