Search code examples
c#multithreadingproducer-consumerblockingqueue

Thread sending data to wrong client. Is this a thread safety issue?


I'm currently developing a server that deals with clients using a consumer/producer approach with threads and a blocking collection as shown below:

public class NetworkClient
{
    private bool _started = false;
    private int _clientId;
    private readonly Socket _socket;

    private static AutoResetEvent _lastMessageWasAckd = new AutoResetEvent(true);
    private static BlockingCollection<byte[]> _messageQueue = new BlockingCollection<byte[]>(); 
        

    public NetworkClient(Socket socket, int clientId)
    {
        this._socket = socket;
        this._clientId = clientId;
    }

    public int getClientID() {
        return _clientId;
    }

    public void SendMessage(string message)
    {
        Console.WriteLine("Adding to player's sending queue " + _clientId);
        _messageQueue.Add(Encoding.ASCII.GetBytes(message));
    }


    public void Start()
    {
        Thread receiver = new Thread(new ThreadStart(ReceivingThreadProc));
        Thread sender = new Thread(new ThreadStart(SendingThreadProc));
        receiver.Start();
        sender.Start();
        this._started = true;
    }
        
    public void Stop()
    {
        this._started = false;
    }

    private void ReceivingThreadProc()
    {
        byte[] bytes = new byte[1024];
        string data;
        try
        {
            while (_started && _socket.Connected)
            {
                int numByte = this._socket.Receive(bytes);
                data = Encoding.ASCII.GetString(bytes, 0, numByte);
                if (numByte == 0)
                {
                    break;
                }

                if (data == "ACK")
                {
                    _lastMessageWasAckd.Set();
                    continue;
                }

                // Acknowledge the message
                _socket.Send(Encoding.ASCII.GetBytes("ACK"));

                ServerReceiver.onEvent(this._clientId, data);
            }
        }
        catch (Exception e)
        {
            this._socket.Close();
        }
    }

    private void SendingThreadProc()
    {
        while (_started && _socket.Connected)
        {
            _lastMessageWasAckd.WaitOne();

            byte[] message = _messageQueue.Take();
                
            Console.WriteLine("Sending the following message to client number: " + _clientId);
            Console.WriteLine(System.Text.Encoding.ASCII.GetString(message));
                
            _socket.Send(message);

            _lastMessageWasAckd.Reset();
        }
    }
}

There will be an instance of NetworkClient created for every client that connects for the server. The issue is that sometimes a message is queued to be sent to client 1 (this is confirmed by the Console.Writeline in the SendMessage method) however that message is sent to client 0 (Shown by the console writeline in the SendingThreadProc method) instead. Is this due to a thread safety issue, or am I missing something entirely? This typically happens when two messages are sent right after one another.

Any help would be greatly appreciated.

EDIT:

As many people rightly pointed out I haven't added where I call SendMessage I'll put this class down below:

class NetworkServer
{
    private int latestClient = 0;
    private ServerReceiver _serverReceiver;
    private static readonly Dictionary<int, NetworkClient> clients = new Dictionary<int, NetworkClient>();
        
    public NetworkServer()
    {
        IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 5656);
        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        this._serverReceiver = new ServerReceiver();
        this._serverReceiver.start();

        try
        {
            listener.Bind(endpoint);
            listener.Listen(10);

            while (true)
            {
                Socket clientSocket = listener.Accept();
                this.OnClientJoin(clientSocket);
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }
public static void SendClientMessage(int clientId, string package, CommandType commandType,
    Dictionary<string, object> data = null)
{
    if (data == null)
    {
        data = new Dictionary<string, object>();
    }
        
    SendClientMessageRaw(clientId, new NetworkCommand(clientId, package, commandType, data).ToJson());
}

public static void SendClientMessageRaw(int id, string message)
{
    Console.WriteLine("Supposed to send to client number " + clients[id].getClientID());
    clients[id].SendMessage(message);
}

private void OnClientJoin(Socket socket)
{
    // Add client to array, perform handshake?
    NetworkClient networkClient = new NetworkClient(socket, latestClient);
    clients.Add(latestClient, networkClient);
    Console.WriteLine("player added :" + latestClient);
    networkClient.Start();
    latestClient++;
        
    if (latestClient == 2)
    {
        SendClientMessage(1, "Test", CommandType.Ping, null);
    }
}

Solution

  • Could it be because your message queue is static and therefore shared between all NetworkClients? Meaning client A can pick up a message for client B?

    Might be as easy as removing static from the properties.