Search code examples
apache-flinkflink-streaminggoogle-cloud-pubsub

Getting http2 exception on a Flink Job with a GCP PubSub source


I have a flink job that uses GCP PubSub as the source. Even though I'm able to process messages I receive on a pubsub topic, I see a couple of problems with it:

  1. The message that was processed earlier shows up again (Indication that acknowledgement wasn't made)
  2. I see grpc http2 exceptions on my flink job along with the checkpoint logs

Exception stack trace:

io.grpc.StatusRuntimeException: INTERNAL: http2 exception
    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
    at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
    at org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
    at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:330)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1092)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1057)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1080)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil.headerListSizeExceeded(Http2CodecUtil.java:245)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.headerSizeExceeded(DefaultHttp2FrameReader.java:694)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.addFragment(DefaultHttp2FrameReader.java:710)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:481)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:491)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1526)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1275)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1322)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

The below seem to be the underlying cause

Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)

Here's how I configured my PubSubSource object:

    PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .withPubSubSubscriberFactory(100, Duration.ofSeconds(90), 3)
            .build();

Checkpointing interval is 60s (lesser than the ack deadline)

What could be the cause for the above 2 issues ?


Solution

  • I increased the maxInboundMetadataSize further to 1MB and then it started to work.

    public class CustomPubSubSubscriberFactory implements PubSubSubscriberFactory {
      private final int retries;
      private final Duration timeout;
      private final int maxMessagesPerPull;
      private final String projectSubscriptionName;
    
      public CustomPubSubSubscriberFactory(
          String projectSubscriptionName, int retries, Duration pullTimeout, int maxMessagesPerPull) {
        this.retries = retries;
        this.timeout = pullTimeout;
        this.maxMessagesPerPull = maxMessagesPerPull;
        this.projectSubscriptionName = projectSubscriptionName;
      }
    
      @Override
      public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
        ManagedChannel channel =
            NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
                .negotiationType(NegotiationType.TLS)
                .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
                .maxInboundMetadataSize(1048576)
                .build();
    
        PullRequest pullRequest =
            PullRequest.newBuilder()
                .setMaxMessages(maxMessagesPerPull)
                .setSubscription(projectSubscriptionName)
                .build();
        SubscriberGrpc.SubscriberBlockingStub stub =
            SubscriberGrpc.newBlockingStub(channel)
                .withCallCredentials(MoreCallCredentials.from(credentials));
        return new BlockingGrpcPubSubSubscriber(
            projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
      }
    }
    

    Invoked as

    PubSubSource pubSubSource =
            PubSubSource.newBuilder()
                .withDeserializationSchema(new SomeDeserializer())
                .withProjectName("some-project")
                .withSubscriptionName("some-subscription")
                .withCredentials(GoogleCredentials.fromStream(file))
                .withPubSubSubscriberFactory(
                    new CustomPubSubSubscriberFactory(
                        ProjectSubscriptionName.format(
                            "some-project", "some-subscription"),
                        3,
                        Duration.ofSeconds(90),
                        100))            
                .build();