The below error keeps repeating every 15s while using a Pub/Sub Source for the Flink Job that I'm using. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pubsub/
Source: pub-sub-source -> filter -> (Sink: Docstore Sink, Map -> Sink: pinot-kafka-sink) (2/2)#3 (b7790c8ab117377fb8d85b1af23b1d11) switched from RUNNING to FAILED.
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 14.999620292s. [remote_addr=pubsub.googleapis.com/123.123.12.12:443]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.pull(SubscriberGrpc.java:1641)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:73)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:67)
at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.run(PubSubSource.java:128)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
The PubSubSource that I'm creating looks like the below:
PubSubSource pubSubSource =
PubSubSource.newBuilder()
.withDeserializationSchema(new SomeDeserializer())
.withProjectName("some-project")
.withSubscriptionName("some-subscription")
.withCredentials(GoogleCredentials.fromStream(file))
.build();
Is there any config I need to set so that this doesn't come when there are no messages available in the Pub/Sub subscription ?
I have modified the checkpoint interval to be less than the acknowledgement deadline on Pub/Sub subscription. Still this happens.
The deadline you are encountering seems to be the pullTimeout
which is set to 15s on the default subscriber (when no explicit factory is defined):
pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(
// subscriptionName
ProjectSubscriptionName.format(projectName, subscriptionName),
// retries
3,
// pullTimeout
Duration.ofSeconds(15),
// messagesPerPull
100
);
You may want to consider defining a custom factory for your PubSubSource
configuration to adjust these timeouts further via the withPubSubSubscriberFactory()
function on the source:
PubSubSource.newBuilder()
.withPubSubSubscriberFactory(...)
Per the source code, you may also want to ensure that the checkpointing frequency is significantly higher than your PubSub-related timeouts to avoid issues on that front:
"The PubSubSource REQUIRES Checkpointing to be enabled and the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message."
You may want to consider increasing the pullTimeout
to a larger value (such as 30s, 1m) while setting the checkpointing frequency so some percentage of that (half, quarter, etc.) to see if that helps resolve the timeouts.