Search code examples
c#serializationmemorystream

Networksimulation with memorystream


I have to simulate a Network with nodes and the MemoryStream, I want to send objects messages from one node to another.

The problem is I always get exception when I want to dezerialize the IMessage object in the stream. Also, how I can simulate the server.AcceptClient?

Exception:

System.Runtime.Serialization.SerializationException: "The input stream does not have a valid binary format. The start contents (in bytes) are: 05-01-00-00-00-22-50-65-65-72-5F-74-6F -5F-50-65-65 ... "

Code:

public class Network
{
    public List<Node> Nodes { get; set; }

    public Network()
    {
        Nodes = new List<Node>();
    }

    public MemoryStream GetClientStream(string ip)
    {
        foreach (var item in Nodes)
        {
            if (item.Ip == ip)
            {
                return item.Stream;
            }
        }

        return null;
    }
}


public class Node
{
    public string Ip { get; set; }

    public Client ChordClient { get; set; }

    public MemoryStream Stream { get; set; }

    public Network ChordChain { get; set; }

    public Node(string ip, string peerOne, Network network)
    {
        ChordChain = network;
        Ip = ip;
        Console.WriteLine($"Peer with Port {ip} is listening");
        Task.Run(() => openServer());
    }

    private void openServer()
    {
        try
        {
            Byte[] bytes = new Byte[256];

            Stream = new MemoryStream();
            // Enter the listening loop.
            while (true)
            {
                int i;

                while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
                {
                    Stream.Position = 0;
                    IMessage msg = NodeTest.DeserializeFromStream(Stream);

                    msg.DoOperate();

                    Stream.Flush();
                }
            }
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        finally
        {
            // Stop listening for new clients.
        }
    }

    public void Connect(String ip)
    {
        try
        {
            IMessage msg = new Message();

            MemoryStream stream = ChordChain.GetClientStream(ip);

            NodeTest.SerializeToStream(stream, msg);

            Console.WriteLine("Sent: {0}", "sd");
        }
        catch (ArgumentNullException e)
        {
            Console.WriteLine("ArgumentNullException: {0}", e);
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        catch (Exception ex)
        {

        }


    }

}

public class NodeTest
{

    public static MemoryStream SerializeToStream(MemoryStream stream, object o)
    {
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(stream, o);

        return stream;
    }

    public static IMessage DeserializeFromStream(MemoryStream stream)
    {

        stream.Position = 0;
        IFormatter formatter = new BinaryFormatter();
        stream.Seek(0, SeekOrigin.Begin);
        object o = formatter.Deserialize(stream);

        return (IMessage)(o);
    }
}

public class Message : IMessage
{

    public void DoOperate()
    {
        Console.WriteLine("Hello");
        Console.ReadKey();
    }

}

public interface IMessage
{
    void DoOperate();
}

Thanks for you help but now i have the problem how i can send the fix size of the object thaqt it works correctly

public class Node
{
    public string Ip { get; set; }

    public Client ChordClient { get; set; }

    public MemoryStream Stream { get; set; }

    public Network ChordChain {get; set; }

    public Node(string ip, string peerOne, Network network)
    {
        ChordChain = network;
        Ip = ip;
        Console.WriteLine($"Peer with Port {ip} is listening");
        Task.Run(() => openServer());
    }

    private void openServer()
    {
        try
        {
            NamedPipeServerStream server = new NamedPipeServerStream(Ip);
            Byte[] bytes = new Byte[256];
            int i;

            while (true)
            {
                server.WaitForConnection();

                while ((i = server.Read(bytes, 0, bytes.Length)) != 0)
                {

                    //Stream.Position = 0;
                    IMessage msg = NodeTest.DeserializeFromStream(server);

                    msg.DoOperate();
                }

            }

            //MemoryStream reader = new MemoryStream(server);
            //StreamWriter writer = new StreamWriter(server);

            //Byte[] bytes = new Byte[256];

            //Stream = new MemoryStream();
            //// Enter the listening loop.
            //while (true)
            //{
            //    int i;

            //    while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
            //    {
            //        //Stream.Position = 0;
            //        IMessage msg = NodeTest.DeserializeFromStream(Stream);

            //        msg.DoOperate();

            //        Stream.Flush();
            //    }               
            //}
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        finally
        {
            // Stop listening for new clients.
        }
    }

    public void Connect(String ip)
    {
        try
        {              
            IMessage msg = new Message();


            NamedPipeClientStream client = new NamedPipeClientStream(ip);
            client.Connect();

            NodeTest.SerializeToStream(client, msg);

            Console.WriteLine("Sent: {0}", "sd");

            client.Close();
        }
        catch (ArgumentNullException e)
        {
            Console.WriteLine("ArgumentNullException: {0}", e);
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        catch(Exception ex)
        {

        }  
    }

}

public class NodeTest
{

    public static void SerializeToStream(NamedPipeClientStream stream, object o)
    { 
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(stream, o);
    }

    public static IMessage DeserializeFromStream(NamedPipeServerStream   stream)
    {

        //stream.Position = 0;
        IFormatter formatter = new BinaryFormatter();
        //stream.Seek(0, SeekOrigin.Begin);
        object o = formatter.Deserialize(stream);

        return (IMessage)(o);
    }  
}

Solution

  • Your use of MemoryStream isn't thread safe. You can't reposition and read the stream on one thread while another is writing to it. To model this type of asynchronous communication, use pipe streams:

     NamedPipeServerStream server = new NamedPipeServerStream("MyPipe");
     server.WaitForConnection();
     StreamReader reader = new StreamReader(server);
     StreamWriter writer = new StreamWriter(server);
    
     [...]
    
     NamedPipeClientStream client = new NamedPipeClientStream("MyPipe");
     client.Connect();
     StreamReader reader = new StreamReader(client);
     StreamWriter writer = new StreamWriter(client);