Search code examples
c#.netprotocol-buffers

Trouble deserializing extended proto fields


I'm working on a CQG integration using protocol buffers (Proto2) in .NET environment. I've encountered a problem with deserializing extended fields in the protocol buffer. Despite the server sending the extended field (diagnostic session token) correctly, it seems to be unable to parse it. I need help understanding why this deserialization issue is occurring and how to resolve it.

Background

The Demo/Production proto files can be downloaded from CQG's documentation. Among these, there is an extension specifically for diagnostic purposes:

syntax = "proto2";
package diagnostic_session_token_2;
option csharp_namespace = "CqgWebApi.DiagnosticSessionToken";

import "Proto/WebAPI/user_session_2.proto";
import "Proto/WebAPI/webapi_2.proto";

extend user_session_2.Logon
{
  // Specifies if session token should be included into each server message after logon.
  optional bool include_diagnostic_session_token = 300;
}

extend WebAPI_2.ServerMsg
{
  // Session token for diagnostic purposes.
  // It is added to each server message and it equals to session_token in the LogonResult message.
  optional string diagnostic_session_token = 300;
}

This extension is designed to add a diagnostic session token to each server message, mirroring the session_token in the LogonResult message.

Server Logs

Here are snippets of the server logs indicating that the extension is being sent and received (scroll rightmost):

Message in: logon {   user_name: "***"   password: "***"   private_label: "***"   client_app_id: "***"   client_version: "1"   drop_concurrent_session: true   protocol_version_minor: 173   protocol_version_major: 2   session_settings: 1   fingerprint: "2023-12-18T15:40:57.0588877Z"   [diagnostic_session_token_2.include_diagnostic_session_token]: true } 
Message out: logon_result {   result_code: 0   base_time: "2023-12-11T03:47:03"   session_token: "0fWzz47ZC63k7Kfp5T2ja26yVeb9JTl89nZ3X0m5XbYnWomsknDAR+ugjOCBLw6++27dAGV5XhI"   protocol_version_minor: 181   protocol_version_major: 2   user_id: 125089   server_time: 647632704 } [diagnostic_session_token_2.diagnostic_session_token]: "0fWzz47ZC63k7Kfp5T2ja26yVeb9JTl89nZ3X0m5XbYnWomsknDAR+ugjOCBLw6++27dAGV5XhI" 
...
Message out: information_reports {   id: 5   status_code: 1   symbol_resolution_report {     contract_metadata {       contract_id: 5       contract_symbol: "F.US.KWEZ23"       correct_price_scale: 0.125       display_price_scale: 103       description: "KC HRW Wheat (Globex): December 2023"       title: "KWEZ23"       tick_size: 0.25       currency: "USD"       tick_value: 12.5       cfi_code: "FXXXXX"       is_most_active: false       last_trading_date: 245577000       first_notice_date: -964023000       instrument_group_name: "F.US.KWE"       session_info_id: 5043       mic: "XKBT"       short_instrument_group_name: "KWE"       instrument_group_description: "KC HRW Wheat (Globex)"       dialect_id: "0"       country_code: "US"       contract_size: "5000 Bushels"       position_tracking: 1       speculation_type_required: false       maturity_month_year: "Z23"       price_display_mode: PRICE_DISPLAY_MODE_NUMERATOR       volume_scale {         significand: 1       }       volume_display_exponent: 0       trade_size_increment {         significand: 1       }       extended_description: "KC HRW Wheat (Globex): December 2023"       mic_description: "KANSAS CITY BOARD OF TRADE"       has_exchange_volume: true       maintenance_margin: 2587       contract_size_in_units {         significand: 5         exponent: 3       }       contract_size_unit {         key: "Bushels"         text: "Bushels"       }       listing_period_type: 0       listing_period_value: 12       symbol_id: "CAT.3.2279071"       deleted: true       has_inverted_price_ladder: false       has_yields: false       contributor_group_id: 3       market_state_group_id: 84       cqg_contract_symbol: "F.US.KWEZ23"       pricing_convention: 1       bar_building_tick_types: 3       bar_building_tick_types: 4       quoted_in: "USD  Cents  and quarter Cents bushel"       product_symbol_id: "CAT.1.34791"       exchange_id: 3       supports_continuation: true       initial_margin: 2907     }     deleted: true   } } [diagnostic_session_token_2.diagnostic_session_token]: "0fWzz47ZC63k7Kfp5T2ja26yVeb9JTl89nZ3X0m5XbYnWomsknDAR+ugjOCBLw6++27dAGV5XhI" 

Code Snippets

The relevant code includes methods for logon and processing server responses.

public async Task LogonAsync(string username, string password, string privateLabel, string clientAppId, string clientAppVersion, Func<LogonResult, ValueTask> onSuccess, Func<string, ValueTask> onError)
{
    var logon = new Logon
    {
        UserName = username,
        Password = password,
        PrivateLabel = privateLabel,
        ClientAppId = clientAppId,
        ClientVersion = clientAppVersion,
        SessionSettings = { (uint)Logon.Types.SessionSetting.AllowSessionRestore },
        ProtocolVersionMinor = (uint)ProtocolVersionMinor.ProtocolVersionMinor,
        ProtocolVersionMajor = (uint)ProtocolVersionMajor.ProtocolVersionMajor,
        DropConcurrentSession = true,
        Fingerprint = DateTime.UtcNow.ToString("o")
    };

    logon.SetExtension(DiagnosticSessionToken2Extensions.IncludeDiagnosticSessionToken, true);
    
    await RequestAsync(new() { Logon = logon }, onSuccess, onError, serverMsg => serverMsg.LogonResult != null, serverMsg => serverMsg.LogonResult);
}

void CreateSubscription() =>
    _mainSubscription = _client.MessageReceived
        .Select(msg => ServerMsg.Parser.ParseFrom(msg.Binary))
        .Where(parsedMsg => parsedMsg != null)
        .TakeWhile(_ => _sessionStateValues.Unsubscribe != true)
        .ObserveOn(TaskPoolScheduler.Default)
        .SubscribeSafelyAsync(ProcessServerResponse, _logger);

async Task ProcessServerResponse(ServerMsg serverMsg)
{
    if (serverMsg.HasExtension(DiagnosticSessionToken2Extensions.DiagnosticSessionToken))
    {
        var sessionToken = serverMsg.GetExtension(DiagnosticSessionToken2Extensions.DiagnosticSessionToken);
        _logger.LogInformation("Session Token: {SessionToken}", sessionToken);
    }
    else
    {
        _logger.LogInformation("Session Token not present in the message."); // TODO: This is always the case due to the serialization issue
    }

    ...
}

I created a unit test to illustrate this issue:

public class ServerMsgTests
{
    [Fact]
    public void GivenServerMsgWithDiagnosticSessionToken_WhenSerializedAndDeserialized_ShouldContainToken()
    {
        // Arrange
        var expectedToken = "test_token";
        var serverMsg = new ServerMsg();
        serverMsg.SetExtension(DiagnosticSessionToken2Extensions.DiagnosticSessionToken, expectedToken);

        // Serialize
        byte[] serializedData;
        using (var memoryStream = new MemoryStream())
        {
            serverMsg.WriteTo(memoryStream);
            serializedData = memoryStream.ToArray();
        }

        // Act
        var deserializedServerMsg = ServerMsg.Parser.ParseFrom(serializedData);
        var actualToken = deserializedServerMsg.GetExtension(DiagnosticSessionToken2Extensions.DiagnosticSessionToken);

        // Assert
        actualToken.Should().Be(expectedToken);
    }
}

Despite the server correctly sending the diagnostic_session_token, it doesn't seem to parse this extended field.

What I've tried:

  • Verifying server-side transmission of the extended field.
  • Replicating the issue in a unit test.

Any advice or suggestions on how to resolve this would be greatly appreciated.


Solution

  • The problem is that you're parsing without using an ExtensionRegistry.

    Something like this should work:

    // All of this part can be in a single place, and the parser reused.
    var extension = DiagnosticSessionToken2Extensions.DiagnosticSessionToken;
    // Include all the extensions you need.
    var registry = new ExtensionRegistry { extension };
    var parser = ServerMsg.Parser.WithExtensionRegistry(registry);
    
    var deserializedServerMsg = parser.ParseFrom(serializedData);