I have to consume from a Kafka topic, get the message and do some json clean and filter job, then I need to produce the new message to another Kafka topic, my code is like this:
public static YamlMappingNode configs;
public static void Main(string[] args)
using (var reader = new StreamReader(Path.Combine(Directory.GetCurrentDirectory(), ".gitlab-ci.yml")))
var yaml = new YamlStream();
//find variables
configs = (YamlMappingNode)yaml.Documents[0].RootNode;
configs = (YamlMappingNode)configs.Children.Where(k => k.Key.ToString() == "variables")?.FirstOrDefault().Value;
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
Run_ManualAssign(configs, cts.Token);
public static async void Run_ManualAssign(YamlMappingNode configs, CancellationToken cancellationToken)
var brokerList = configs.Where(k => k.Key.ToString() == "kfk_broker")?.FirstOrDefault().Value.ToString();
var topics = configs.Where(k => k.Key.ToString() == "input_kfk_topic")?.FirstOrDefault().Value.ToString();
var config = new ConsumerConfig
// the group.id property must be specified when creating a consumer, even
// if you do not intend to use any consumer group functionality.
GroupId = new Guid().ToString(),
BootstrapServers = brokerList,
// partition offsets can be committed to a group even by consumers not
// subscribed to the group. in this example, auto commit is disabled
// to prevent this from occurring.
EnableAutoCommit = true
using (var consumer =
new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
//consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
consumer.Assign(new TopicPartitionOffset(topics, 0, Offset.End));
//var producer = new ProducerBuilder<Null, string>(config).Build();
while (true)
var consumeResult = consumer.Consume(cancellationToken);
/// Note: End of partition notification has not been enabled, so
/// it is guaranteed that the ConsumeResult instance corresponds
/// to a Message, and not a PartitionEOF event.
//filter message
var result = ReadMessage(configs, consumeResult.Message.Value);
//send to kafka topic
await Run_ProducerAsync(configs, result);
catch (ConsumeException e)
Console.WriteLine($"Consume error: {e.Error.Reason}");
catch (OperationCanceledException)
Console.WriteLine("Closing consumer.");
#region Run_Producer
public static async Task Run_ProducerAsync(YamlMappingNode configs, string message)
var brokerList = configs.Where(k => k.Key.ToString() == "kfk_broker")?.FirstOrDefault().Value.ToString();
var topicName = configs.Where(k => k.Key.ToString() == "target_kafka_topic")?.FirstOrDefault().Value.ToString();
var config = new ProducerConfig {
BootstrapServers = brokerList,
using (var producer = new ProducerBuilder<Null, string>(config).Build())
/// Note: Awaiting the asynchronous produce request below prevents flow of execution
/// from proceeding until the acknowledgement from the broker is received (at the
/// expense of low throughput).
var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = message });
Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
catch (ProduceException<string, string> e)
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
Am I doing something wrong here? The program existed immediately when executing var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = message });
, no error message, no error code.
In the meanwhile I used Python and config the same for Producer
, it works well.
Run_ManualAssign(configs, cts.Token);
For this line in the Main function, you are calling async without await in a sync function. Thus the program exit immediately after this invoke started (not finished as it is async)
You could have 2 options
Use async Main function and add await in front of this invoke.
If you really want to call async function in sync function
Run_ManualAssign(configs, ts.Token).ConfigureAwait(false).GetAwaiter().GetResult();