Search code examples
apache-kafkasalesforceapache-kafka-connect

Salesforce Bulk API Sink Connector creates too many batches


I am building a prototype to synch data from MySQL to Salesforce running Kafka in Docker containers on my laptop. I followed Quick Start for Confluent Platform to install and run Confluent Platform. To take data from MySQL and send over to Salesforce I am using Debezium MySQL CDC Source Connector and Salesforce Bulk API Connector. Everything works fine except that when new updates from MySQL come one by one with a small but bigger than half a second delay the connector creates a batch for each update. So, running the following bash script creates 3 batches in Salesforce:

for i in $(seq 3); do
    QUERY="UPDATE some_table SET state = 'state_$i' where id = 1;"
    mysql -u user_name -h host_name -e "$QUERY" -p<mysql_password>
    sleep .7
done

The problem here is that the Salesforce connector pushes records to Salesforce too often, while there is a limit in Salesforce for the max number of batches per 24 hours - 15,000. To guarantee to stay within this limit I’d like to enforce the connector to buffer updates before pushing them into Salesforce. To archive that I am trying to set fetch.min.bytes and fetch.max.wait.ms properties. There is a document Salesforce Bulk API Sink Connector Configuration Properties which says

You can override producer-specific properties by using the confluent.topic.producer. prefix and consumer-specific properties by using the confluent.topic.consumer. prefix.

So my attempt was to put these properties in the salesforce connector config (here I want the connector to buffer records for 10sec or until they reach 1000 bytes):

    "confluent.topic.consumer.fetch.max.wait.ms": "10000",
    "confluent.topic.consumer.fetch.min.bytes": "1000"

It seems like it has no effect as fetch.max.wait.ms stays to be 500ms which is the default value.

So, first I'd like to make sure that I am looking into the right direction trying to use fetch.min.bytes and fetch.max.wait.ms properties, that they are applicable to the connector at all. And second, how can I set these properties properly?


Solution

  • Thank you @OneCricketeer, the link you provided helped me to solve the problem. Although I didn't change the connector.client.config.override.policy setting for the worker as it is described in the "Connector-level producer/consumer configuration overrides" section, just setting fetch.max.wait.ms and fetch.min.bytes for the connector was enough. So the final version of the connector's config now looks like:

    {
      "name": "SalesforceBulkApiSinkConnectorConnector_0",
      "config": {
        "connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
        "tasks.max": "1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "errors.tolerance": "all",
        "topics": "sf_sink",
        "errors.deadletterqueue.topic.name": "sf_sink_dlq",
        "salesforce.instance": "https://<instance>.my.salesforce.com",
        "salesforce.username": "<username>",
        "salesforce.password": "<password>",
        "salesforce.password.token": "<token>",
        "salesforce.object": "KafkaEvent__e",   
        "salesforce.sink.object.operation": "insert",
        "override.event.type": "true",
        "salesforce.use.custom.id.field": "false",
        "behavior.on.api.errors": "log",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.replication.factor": "1",
        "reporter.bootstrap.servers": "broker:29092",
        "consumer.override.fetch.max.wait.ms": "10000",
        "consumer.override.fetch.min.bytes": "5000"
      } 
    }