When i query my consumer for the assigned Topic Partition List all of the Partitions in the result has an offset of -1001. If i print out the offset of received Message, the Offset is set to the right Value.
This is the Code i used to consume the Messages:
static void print_partition_list(FILE* fp,
const rd_kafka_topic_partition_list_t
* partitions) {
int i;
for (i = 0; i < partitions->cnt; i++) {
fprintf(fp, "%s %s [%d] offset %lld",
i > 0 ? "," : "",
partitions->elems[i].topic,
partitions->elems[i].partition,
partitions->elems[i].offset);
}
fprintf(fp, "\n");
}
static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
fprintf(stderr, "%% Consumer group rebalanced: ");
switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stderr, "assigned:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stderr, "revoked:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, NULL);
break;
default:
fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}
}
int main()
{
rd_kafka_t* rk;
rd_kafka_conf_t* conf;
rd_kafka_resp_err_t err;
char errstr[512];
const char* brokers{ "localhost:9092" };
const char* groupid{ "OffsetTest" };
const char* topics[] = { "OffsetTesting" };
rd_kafka_topic_partition_list_t* subscription;
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "group.id", groupid,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
return 1;
}
conf = NULL;
rd_kafka_poll_set_consumer(rk);
subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);
err = rd_kafka_subscribe(rk, subscription);
if (err) {
fprintf(stderr,
"%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr,
"%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n",
subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
int runningCounter = 0;
while (runningCounter != 10) {
rd_kafka_message_t* rkm;
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm) {
Sleep(2000);
runningCounter++;
continue;
}
if (rkm->err) {
fprintf(stderr,
"%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}
rd_kafka_topic_partition_list_t* list;
err = rd_kafka_assignment(rk, &list);
if (err) {
fprintf(stderr,
"%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
return 1;
}
print_partition_list(stderr, list);
rd_kafka_topic_partition_list_destroy(list);
printf("Message on %s [%d] at offset %lld:\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
if (rkm->key)
printf(" Key: %.*s\n",
(int)rkm->key_len, (const char*)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);
if (rkm->payload)
printf(" Value: %.*s\n",
(int)rkm->len, (const char*)rkm->payload);
else if (rkm->key)
printf(" Value: (%d bytes)\n", (int)rkm->len);
rd_kafka_commit_message(rk, rkm, 0);
rd_kafka_message_destroy(rkm);
runningCounter++;
}
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
return 0;
}
I know there is a answer to a similar question here LibRdKafka: commited_offset always at -1001
but this does not help. I assign the Topic Partition List to the Consumer in the rebalance_cb
.
Update:
This is the Output for example 2 Messages:
> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).
> % Subscribed to 1 topic(s), waiting for rebalance and messages...
> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
>
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
>
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
>
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1
I believe this may be by design.
The rd_kafka_assignment()
method returns the assignment that was provided via rd_kafka_assign()
. When a Consumer is assigned partitions within a group, the assignment is only a list of partitions, there is no offset.
Similarly in the Java library, assignment()
returns Set<TopicPartition>
, no offsets here either. In librdkafka, rd_kafka_assignment()
gives a rd_kafka_topic_partition_list_t
, which is analogous to Set<TopicPartition>
. The main difference, is that it reuses the rd_kafka_topic_partition_t
type which has a few extra fields like offset
.
The rd_kafka_topic_partition_t
type is used in many places and all its fields don't make sense in all contexts. This is the case for assignment context hence some fields are set to "blank" values, and this is -1001
for the offset.
If you want to get the current offsets for the assignment, you need to use rd_kafka_position()
. Likewise, in Java, you'd use position()
.