We are implementing a Kafka topic that is a 3 machine cluster. We are going to be pulling off of a Solace queue and are wondering what the tasks.max
parameter does and how it divides the messages. We do not have access to the solace queue and can only call and see if we are dropping messages off the buffer. any insight would be great!! Current system in a hornetQ and it is running 5 threads.
Enviroment:
Solace-kafka-connector-source-master
We have tried it in standalone with a single task and it works. Since we don't control the solace queue we can see if we are dropping messages.
{
"name": "solaceSourceConnector",
"config": {
"connector.class":
"com.solace.source.connector.SolaceSourceConnector",
"tasks.max": "1",
"kafka.topic": "solacetest",
"sol.host": "HOSTNAME",
"sol.username": "USERNAME",
"sol.password": "PASSWORD",
"sol.vpn_name": "VPN IP",
"sol.topics": "soltest, soltest1,solacetest2",
"sol.queue": "testQ",
"sol.message_processor_class": "com.solace.source.connector.msgProcessors.SolaceSampleKeyedMessageProcessor",
"sol.generate_send_timestamps": "true",
"sol.generate_rcv_timestamps": "true",
"sol.sub_ack_window_size": "255",
"sol.generate_sequence_numbers": "true",
"sol.calculate_message_expiration": "true",
"sol.subscriber_dto_override": "true",
"sol.channel_properties.connect_retries": "-1",
"sol.channel_properties.reconnect_retries": "-1",
"sol.kafka_message_key": "DESTINATION",
"sol.ssl_trust_store": "/opt/PKI/skeltonCA/heinz1.ts",
"sol.ssl_trust_store_pasword": "sasquatch",
"sol.ssl_trust_store_format": "JKS",
"sol.ssl_key_store": "/opt/PKI/skeltonCA/heinz1.ks",
"sol.ssl_key_store_password": "sasquatch",
"sol.ssl_key_store_format": "JKS",
"sol.ssl_key_store_normalized_format": "JKS",
"sol.ssl_private_key_alias": "heinz1",
"sol.ssl_private_key_password": "sasquatch"
}
}
I am trying to make it so we arent loosing messages in the buffer.
max.tasks 1 would work with an Exclusive Queue since there can only be one active subscriber to that kind of Solace queue. It will also guarantee message order is preserved from Solace into Kafka.
If the Solace Queue is a Non-Exclusive (i.e. Shared) Queue then increasing the max.tasks will define an upper bound on the number of concurrent Solace clients consuming from the queue. This allows horizontal scaling using multiple consumers from the queue for better throughput. However Solace does not guarantee message order across multiple consumers of a Non-Exclusive Queue so message ordering may not be preserved.
In either case messages are not lost because they are not acknowledged back to Solace and deleted until after they successfully acknowledged to have been written to Kafka.