Could anyone please provide the sample code in spring-boot for Debezium connector Listeners class and configuration class for MongoDB which provide the changed data payload in Json. Thanks in advance!
Here's my Debezium Connector Listner:
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration mongoConnector) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(mongoConnector.asProperties()).notifying(t -> {
try {
log.error("Running Debezium Engine.");
} catch (Exception e) {
log.error("Error on running Debezium Engine.");
* It Records any Change Event in database based on DebeziumConnectorConfig.
* @param sourceRecordChangeEvent
* @throws IOException
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) throws IOException {
SourceRecord sourceRecord = sourceRecordChangeEvent.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Operation.READ) {
// Handling Update & Insert operations.
String records = operation == Operation.DELETE ? BEFORE : AFTER;
Struct struct = (Struct) sourceRecordChangeValue.get(records);
Map<String, Object> payload = struct.schema().fields().stream().map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));"Changes have been detected for product id: {} with Operation: {}",
Optional.ofNullable(payload).map(p -> p.get("productId")).orElse(null),;;
log.debug("Updated Data: {} with Operation: {}", payload,;
private void start() {
private void stop() throws IOException {
if (this.debeziumEngine != null) {
and here is the connector config file:
public class DebeziumConnectorConfig {
public io.debezium.config.Configuration mongoConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create().
with("name", "mongo-connector")
.with("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.with("", "")
.with("", offsetStorageTempFile.getAbsolutePath())
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.with("", "false")
.with("tasks.max", "1")
.with("mongodb.hosts", "rs0/localhost,27017")
.with("", "products_topic")
.with("errors.log.include.messages", "true")
.with("capture.mode", "change_streams_update_full")
.with("collection.include.list", "myCollection")
.with("database.include.list", "myDb")
.with("schema", "true")
.with("", 0)
.with("database.history.kafka.bootstrap.servers" , "kafka:9092")
.with( "transforms", "route")
.with( "transforms.route.type" , "org.apache.kafka.connect.transforms.RegexRouter")
.with( "transforms.route.regex" , "([^.]+)\\.([^.]+)\\.([^.]+)")
.with( "transforms.route.replacement" , "$3")
and after the application startup server keeps on logging the below logs:
7 2022-05-13 09:01:14.455 INFO 15452 --- [ica-set-monitor] i.d.c.mongodb.ReplicaSetDiscovery : Checking current members of replica set at rs0/localhost,27017 2022-05-13 09:01:14.456 INFO 15452 --- [ica-set-monitor] i.d.c.mongodb.ReplicaSetDiscovery : Checking current members of replica set at rs0/localhost,27017
I'm fairly certain you have a typo in your mongodb.hosts
configuration property. It should have a value of:
(Note the :
rather than the ,
you have in your question).