Search code examples
c#azurewebsocketprotobuf-netazure-web-pubsub

Azure PuSub with protobuf data not working


I am using generic WebSocket client (System.Net.WebSockets) to connect to Azure Web PubSub service and communicate between two parties. I referred this official document. I was able to communicate using json but with protobuf I don't receive any data. Below is the sample code I am using:

EDIT: client is receiving some message (after changing class type to 'DownstreamMessage') but it says: disconnectedMessage": { "reason": "Invalid payload. Invalid value : '': Invalid event name. Valid event name should be word in between 1 and 128 characters long. I don't understand whats happening

using Google.Protobuf;

// Create a joining message
UpstreamMessage joiningMessage = new()
{
    JoinGroupMessage = new UpstreamMessage.Types.JoinGroupMessage()
    {
        Group = Group,
    }
};
var joinData = joiningMessage.ToByteArray();

// Initialize the WebSocket
ClientWebSocket webSocket = new();
webSocket.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
await webSocket.ConnectAsync(new Uri(connStr), CancellationToken.None);

// Set up a continuous message receiver
_ = Task.Run(async () =>
{
    while (webSocket.State == WebSocketState.Open)
    {
        var buffer = new byte[8192];
        var result = await webSocket.ReceiveAsync(
                                    new ArraySegment<byte>(buffer),
                                    CancellationToken.None);

        if (result.MessageType == WebSocketMessageType.Binary)
        {
            // Create a properly sized array containing only the actual message
            var messageBytes = new byte[result.Count];
            Array.Copy(buffer, 0, messageBytes, 0, result.Count);

            try
            {
                var msg = DownstreamMessage.Parser.ParseFrom(messageBytes);
                Console.WriteLine($"Received message: {msg}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error parsing message: {ex.Message}");
            }
        }
        else if (result.MessageType == WebSocketMessageType.Close)
        {
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
            break;
        }
    }
});

// Wait for connection establishment
await Task.Delay(2000);

// Send join request
await webSocket.SendAsync(joinData, WebSocketMessageType.Binary, true, CancellationToken.None);
Console.WriteLine("Join group message sent");

// Wait for join to be processed
await Task.Delay(1000);

// Create and send data message
UpstreamMessage dataMessage = new()
{
    // Set the EventMessage with proper target group
    EventMessage = new UpstreamMessage.Types.EventMessage()
    {
        Data = new MessageData()
        {
            TextData = "Hello from protobuf dheeraj"
        }
    }
};

await webSocket.SendAsync(dataMessage.ToByteArray(), WebSocketMessageType.Binary, true, CancellationToken.None);
Console.WriteLine("Data message sent");

Solution

  • The error you encountered, Invalid event name, indicates that the Protobuf message structure was incorrect. I have ensured that EventMessage properly sets the Data field.

    Use BinaryData.FromBytes(message.ToByteArray()) to serialize messages like below.

    await serviceClient.SendToGroupAsync(Group, BinaryData.FromString(message), WebPubSubDataType.Binary);
    

    Below is the sample code for Azure PubSub with protobuf:

        private const string Group = "protobuf-client-group";
        private const string Uri = "wss://"; 
        static async Task Main()
        {
            var serviceClient = new WebPubSubClient(new Uri(Uri), new WebPubSubClientOptions
            {
                Protocol = new WebPubSubProtobufProtocol()
            });
    
            serviceClient.Connected += arg =>
            {
                Console.WriteLine($"Connected with connection id: {arg.ConnectionId}");
                return Task.CompletedTask;
            };
    
            serviceClient.Disconnected += arg =>
            {
                Console.WriteLine($"Disconnected from connection id: {arg.ConnectionId}");
                return Task.CompletedTask;
            };
    
            serviceClient.GroupMessageReceived += arg =>
            {
                Console.WriteLine($"Received Protobuf message: {arg.Message.Data}");
                return Task.CompletedTask;
            };
            await serviceClient.StartAsync();
            await serviceClient.JoinGroupAsync(Group);
            UpstreamMessage joiningMessage = new()
            {
                JoinGroupMessage = new Webpubsub.JoinGroupMessage()
                {
                    Group = Group,
                }
            };
            await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(joiningMessage.ToByteArray()), WebPubSubDataType.Binary);
    
            Console.WriteLine("Join group message sent");
            await Task.Delay(1000);
    
            while (true)
            {
                Console.WriteLine("Enter the message to send or just press enter to stop:");
                var message = Console.ReadLine();
    
                if (!string.IsNullOrEmpty(message))
                {
                    UpstreamMessage dataMessage = new()
                    {
                        EventMessage = new EventMessage()
                        {
                            Data = new MessageData()
                            {
                                TextData = message
                            }
                        }
                    };
                    await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(dataMessage.ToByteArray()), WebPubSubDataType.Binary);
                    Console.WriteLine("Data message sent");
                }
                else
                {
                    await serviceClient.LeaveGroupAsync(Group);
                    await serviceClient.StopAsync();
                    break;
                }
            }
        }
    
    

    enter image description here Output:

    Output