Search code examples
apache-flinkflink-streaminggoogle-cloud-pubsub

Getting DEADLINE_EXCEEDED on a Flink Job while using GCP PubSub source


The below error keeps repeating every 15s while using a Pub/Sub Source for the Flink Job that I'm using. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pubsub/

Source: pub-sub-source -> filter -> (Sink: Docstore Sink, Map -> Sink: pinot-kafka-sink) (2/2)#3 (b7790c8ab117377fb8d85b1af23b1d11) switched from RUNNING to FAILED.
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 14.999620292s. [remote_addr=pubsub.googleapis.com/123.123.12.12:443]
    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
    at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.pull(SubscriberGrpc.java:1641)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:73)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:67)
    at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.run(PubSubSource.java:128)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)

The PubSubSource that I'm creating looks like the below:

PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .build();

Is there any config I need to set so that this doesn't come when there are no messages available in the Pub/Sub subscription ?

I have modified the checkpoint interval to be less than the acknowledgement deadline on Pub/Sub subscription. Still this happens.


Solution

  • The deadline you are encountering seems to be the pullTimeout which is set to 15s on the default subscriber (when no explicit factory is defined):

    pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(
        // subscriptionName
        ProjectSubscriptionName.format(projectName, subscriptionName),
        // retries
        3,
        // pullTimeout
        Duration.ofSeconds(15),
        // messagesPerPull
        100
    );
    

    You may want to consider defining a custom factory for your PubSubSource configuration to adjust these timeouts further via the withPubSubSubscriberFactory() function on the source:

    PubSubSource.newBuilder()
        .withPubSubSubscriberFactory(...)
    

    Per the source code, you may also want to ensure that the checkpointing frequency is significantly higher than your PubSub-related timeouts to avoid issues on that front:

    "The PubSubSource REQUIRES Checkpointing to be enabled and the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message."

    You may want to consider increasing the pullTimeout to a larger value (such as 30s, 1m) while setting the checkpointing frequency so some percentage of that (half, quarter, etc.) to see if that helps resolve the timeouts.