Deploy Kafka and Kafka connect
Create a connector with below pubSubSource
configurations:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "pubSubSource",
"config": {
"connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max":"1",
"cps.subscription":"pubsub-test-sub",
"kafka.topic":"kafka-sub-topic",
"cps.project":"test-project123",
"gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json"
}
}'
Below are the Kafka-connect configurations:
"plugin.path": "/usr/share/java,/usr/share/confluent-hub-components"
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
"key.converter.schemas.enable": "false"
"value.converter.schemas.enable": "false"
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
"config.storage.replication.factor": "1"
"offset.storage.replication.factor": "1"
"status.storage.replication.factor": "1"
Publish a message to the PubSub topic using the below command:
gcloud pubsub topics publish test-topic --message='{"someKey":"someValue"}'
Read messages from the destination Kafka topics:
/usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning
# Output
{"someKey":"someValue"}
{"someKey":"someValue"}
Why is this happening, is there something that I am doing wrong?
I found below info at https://cloud.google.com/pubsub/docs/faq and seems you are facing the same issue. Could you try producing large message and see if the result is the same?
Details from the link:
Why are there too many duplicate messages? Pub/Sub guarantees at-least-once message delivery, which means that occasional duplicates are to be expected. However, a high rate of duplicates may indicate that the client is not acknowledging messages within the configured ack_deadline_seconds, and Pub/Sub is retrying the message delivery. This can be observed in the monitoring metrics pubsub.googleapis.com/subscription/pull_ack_message_operation_count for pull subscriptions, and pubsub.googleapis.com/subscription/push_request_count for push subscriptions. Look for elevated expired or webhook_timeout values in the /response_code. This is particularly likely if there are many small messages, since Pub/Sub may batch messages internally and a partially acknowledged batch will be fully redelivered.
Another possibility is that the subscriber is not acknowledging some messages because the code path processing those specific messages fails, and the Acknowledge call is never made; or the push endpoint never responds or responds with an error.
How do I detect duplicate messages? Pub/Sub assigns a unique message_id to each message, which can be used to detect duplicate messages received by the subscriber. This will not, however, allow you to detect duplicates resulting from multiple publish requests on the same data. Detecting those will require a unique message identifier to be provided by the publisher. See Pub/Sub I/O for further discussion.