I have this very simple Beam Pipeline that reads records from a Kafka Topic and writes them to a Pulsar Topic:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(
KafkaIO.<Long, String>read()
.withTopic("<topic>")
.withBootstrapServers("<url>")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(getConsumerProps())
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(ParDo.of(new PulsarSink()));
p.run();
From my understanding this should create exactly one Kafka Consumer that pushes it's values down the Pipeline. Now for some reason the Pipeline seems to restart over and over again creating multiple Kafka Consumers and multiple Pulsar Producers.
Here is an excerpt from the logs that show multiple Kafka Consumers being created:
2019-06-07 16:08:30,010 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:30,097 INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in.
2019-06-07 16:08:30,204 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,205 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:30,684 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:30,693 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 0 (total 1): 292330999892453.events.all.v1.json-0
2019-06-07 16:08:30,693 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 1 (total 1): 292330999892453.events.all.v1.json-1
2019-06-07 16:08:30,693 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 2 (total 1): 292330999892453.events.all.v1.json-2
2019-06-07 16:08:30,720 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:30,720 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:30,720 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:30,721 INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in.
2019-06-07 16:08:30,734 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,734 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:30,742 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,742 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:30,743 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,743 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,116 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,117 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null)
2019-06-07 16:08:31,145 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,145 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null)
2019-06-07 16:08:31,147 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,148 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null)
2019-06-07 16:08:31,351 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-0: reading from 292330999892453.events.all.v1.json-0 starting at offset 318186186
2019-06-07 16:08:31,352 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_1189437256_292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:31,359 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:31,359 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,389 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-1: reading from 292330999892453.events.all.v1.json-1 starting at offset 318738731
2019-06-07 16:08:31,389 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-1_offset_consumer_1231768376_292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:31,394 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-2: reading from 292330999892453.events.all.v1.json-2 starting at offset 318129714
2019-06-07 16:08:31,394 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-2_offset_consumer_64443017_292330999892453
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-06-07 16:08:31,395 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:31,395 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,397 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:31,398 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,613 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-3, groupId=292330999892453] Fetch offset 318129714 is out of range for partition 292330999892453.events.all.v1.json-2, resetting offset
2019-06-07 16:08:31,613 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-2, groupId=292330999892453] Fetch offset 318186186 is out of range for partition 292330999892453.events.all.v1.json-0, resetting offset
2019-06-07 16:08:31,641 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-3, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 320367573.
2019-06-07 16:08:31,641 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-2, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 321301099.
2019-06-07 16:08:31,648 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,649 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-4, groupId=292330999892453] Fetch offset 318738731 is out of range for partition 292330999892453.events.all.v1.json-1, resetting offset
2019-06-07 16:08:31,667 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,672 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-4, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 320867070.
2019-06-07 16:08:31,714 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,860 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=Reader-0_offset_consumer_1189437256_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 336281187.
2019-06-07 16:08:31,885 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=Reader-0_offset_consumer_1189437256_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 336281187.
2019-06-07 16:08:31,905 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=Reader-1_offset_consumer_1231768376_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 336474159.
2019-06-07 16:08:31,938 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=Reader-1_offset_consumer_1231768376_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 336474159.
2019-06-07 16:08:31,957 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=Reader-2_offset_consumer_64443017_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 336295646.
2019-06-07 16:08:31,981 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=Reader-2_offset_consumer_64443017_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 336295646.
2019-06-07 16:08:32,142 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-0: first record offset 321301099
Why are Kafka Consumers being restarted over and over again? Is this the expected behavior?
The primary purpose of the DirectRunner is local testing of Pipelines. As such, it exhibits behavior, that might be suboptimal performance-wise. One example might be that it purposefully serializes and deserializes data between operators, even though it is not necessary. The reason is to validate that the application code does not modify the input object, which is forbidden in Beam. The reason why there are many consumers being created is another example of this - DirectRunner performs a checkpoint (including many possibly unnecessary steps, like re-creating the consumer), very often - see here.
As such, DirectRunner really should be used only in tests and/or moderate conditions, where performance is not a concern. When performance is a concern, a different runner should be used - either some distributed, or local version of such runner - e.g. local FlinkRunner.