Search code examples
debezium

How to set the offset.commit.policy to AlwaysCommitOffsetPolicy in debezium?


I created a Debezium Embedded engine to capture MySQL change data. I want to commit the offsets as soon as I can. In the code, the config is created including follows.

.with("offset.commit.policy",OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName())

Running this returns, java.lang.NoSuchMethodException: io.debezium.embedded.spi.OffsetCommitPolicy$AlwaysCommitOffsetPolicy.<init>(io.debezium.config.Configuration)

However, When I start the embedded engine with, .with("offset.commit.policy",OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()), the embedded engine works fine.

Note that the class OffsetCommitPolicy.PeriodicCommitOffsetPolicy constructor includes the config parameter while OffsetCommitPolicy.AlwaysCommitOffsetPolicy doesn't.

public PeriodicCommitOffsetPolicy(Configuration config) { ... }

How to get the debezium embedded engine to use its AlwaysCommitOffsetPolicy?


Solution

  • Thanks for the report. This is partly bug (which we would appreciate if you could log into our Jira). You can solve this issue by calling a dedicated method embedded engine builder like `io.debezium.embedded.EmbeddedEngine.create().with(OffsetCommitPolicy.always())'