I have figured out how to create unary RPC interceptors but I can't figure out how to make streaming RPC interceptors. This is what I have thus far:
public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
IAsyncStreamReader<TRequest> requestStream, ServerCallContext context,
ClientStreamingServerMethod<TRequest, TResponse> continuation)
{
Console.WriteLine("ClientStreaming");
var response = await base.ClientStreamingServerHandler(requestStream, context, continuation);
return response;
}
This code snipped Console logs every time a Client stream is initiated, I just can't figure out how to console log every incoming client message.
Kind Regards Jesse
I had a similar scenario that you have recently, and the following is how I went about solving it with the interceptor. We were in need of measuring the message throughput/message size through our gRPC API. The unary calls were pretty straight forward, but ran into the problem you have with regard to streaming. The following is what I did to intercept the stream (server in my case, should be similar for your scenario).
The first piece of code was the interceptor method that you have already (this one is server streaming)
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
{
// Add the outgoing message size to the metrics
mCollector.Add(mInterceptorName,context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.OUT, request as IMessage);
// This call returns the server stream, among other things and the server stream reader
// is returned for the client to consume.
var prelimResponse = base.AsyncServerStreamingCall(request, context, continuation);
// Add the result message size to the metrics
mCollector.Add(mInterceptorName, context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.IN, prelimResponse as IMessage);
// Wrap the response stream object with our implementation that will log the size and then
// proxy that to the client.
var response = new AsyncServerStreamingCall<TResponse>(new AsyncStreamReaderWrapper<TResponse>(prelimResponse.ResponseStream,
mInterceptorName,
context.Method.ServiceName,
context.Method.Name,mCollector),
prelimResponse.ResponseHeadersAsync,
prelimResponse.GetStatus,
prelimResponse.GetTrailers,
prelimResponse.Dispose);
// return the wrapped stream to the client
return response;
}
And the AsyncServerStreamReaderWrapper implementation that receives the object, measures, logs it size, then passes it along to the client. This wrapper was needed as the Stream reader can only have one consumer and would generate an error if I had multiple readers, which makes sense.
/// <summary>
/// Wrapper class around the gRPC AsyncStreamReader class that allows retrieval of the object
/// before handing off to the client for the purpose of throughput measurements and metrics
/// collection
/// </summary>
/// <typeparam name="T">type of object contained within the stream</typeparam>
public class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
{
private IAsyncStreamReader<T> mInnerImplementation = null;
private NetworkMetricsCollectionService mCollector = null;
private string mId = string.Empty;
private string mService = string.Empty;
private string mMethod = string.Empty;
public T Current => mInnerImplementation.Current;
/// <summary>
/// Advances the reader to the next element in the sequence, returning the result asynchronously.
/// </summary>
/// <param name="cancellationToken">Cancellation token that can be used to cancel the
/// operation.</param>
/// <returns>Task containing the result of the operation: true if the reader was successfully
/// advanced to the next element; false if the reader has passed the end of the sequence.</returns>
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
bool result = await mInnerImplementation.MoveNext(cancellationToken);
if (result)
{
mCollector.Add(mId,mService, mMethod, NetworkMetricsCollectionService.DIRECTION.IN, Current as IMessage);
}
return result;
}
/// <summary>
/// Parameterized Constructor
/// </summary>
/// <param name="aInnerStream">inner stream reader to wrap</param>
/// <param name="aService">service name for metrics reporting</param>
/// <param name="aMethod">method name for metrics reporting</param>
/// <param name="aCollector">metrics collector</param>
public AsyncStreamReaderWrapper(IAsyncStreamReader<T> aInnerStream, string aId, string aService, string aMethod, NetworkMetricsCollectionService aCollector)
{
mInnerImplementation = aInnerStream;
mId = aId;
mService = aService;
mMethod = aMethod;
mCollector = aCollector;
}
}
I know this is not the exact scenario that you are looking for, but I believe your implementation will be similar, but with the StreamWriter rather than the StreamReader and without the bits for trying to measure message sizes.