I have a list of data and want to create the number of Tasks corresponding to the number of elements in the list. But I don't know how to Complete a Channel properly.
My code, but the Channel doesn't close as I expect.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Ding.LearningNewThings
public class MultipleChannel
public static async Task RunMiltipleChannel()
List<Place> listPlace = Place.InitData();
Channel<Position> _dataChannel = Channel.CreateUnbounded<Position>();
var listTask = new Task[11];
var listStatus = new bool[10];
for (int k = 0; k < listPlace.Count(); k++)
var place = listPlace[k];
listTask[k] = Task.Run(async () =>
int kk = k;
int count = 0;
Random r = new Random();
while (count < 10)
int id = r.Next(1, 1000);
var position = new Position()
ID = id,
Name = $"Postion{id}",
PlaceID = place.ID,
PlaceName = place.Name
Console.WriteLine($"WRITE: Position ID: {position.ID}, Postion Name: {position.Name}");
await _dataChannel.Writer.WriteAsync(position);
lock (listStatus)
if(count == 10)
listStatus[k] = true;
bool isStop = true;
foreach(var status in listStatus)
if (!status)
isStop = false;
if (isStop)
listTask[10] = Task.Run(async () =>
while (await _dataChannel.Reader.WaitToReadAsync())
await Task.Delay(100);
var data = await _dataChannel.Reader.ReadAsync();
Console.WriteLine($"READ: Position ID: {data.ID}, Postion Name: {data.Name}");
await Task.WhenAll(listTask);
public class Place
public int ID { get; set; }
public string Name { get; set; }
public static List<Place> InitData()
var listData = new List<Place>();
for (int i = 0; i < 10; i++)
var data = new Place()
ID = i,
Name = $"Postion{i}",
return listData;
public class Position
public int ID { get; set; }
public int PlaceID { get; set; }
public string PlaceName { get; set; }
public string Name { get; set; }
public static List<Position> InitData()
var listData = new List<Position>();
for (int i = 0; i < 10; i++)
var data = new Position()
ID = i,
Name = $"Postion{i}"
return listData;
In case I want to create separate Channels for each Task, how do I Complete them? Please give me an example.
Here's the modified code.
I eliminated the lock and complicated logic there. Also fixed variable capture and added a sanity check on reads.
Comments are in line. Please try running and ask if you have questions.
public class MultipleChannel
public static async Task RunMiltipleChannel()
List<Place> listPlace = Place.InitData();
Channel<Position> _dataChannel = Channel.CreateUnbounded<Position>();
var listTask = new Task[11];
//Count the number of writer tasks that finished
int completedTasks = 0;
for (int k = 0; k < listPlace.Count; k++)
var place = listPlace[k];
//Important to avoid closures
var kCapture = k;
listTask[kCapture] = Task.Run(async () =>
int kk = kCapture;
int count = 0;
Random r = new Random();
while (count < 10)
int id = r.Next(1, 1000);
var position = new Position()
ID = id,
Name = $"Postion{id}",
PlaceID = place.ID,
PlaceName = place.Name
Console.WriteLine($"WRITE: Position ID: {position.ID}, Postion Name: {position.Name}");
await _dataChannel.Writer.WriteAsync(position);
//Thread-safe check to see if this is the last task to complete
if (Interlocked.Increment(ref completedTasks) == 10)
Console.WriteLine($"Task {kk} finished, CHANNEL COMPLETED");
Console.WriteLine($"Task {kk} finished");
//Make sure we read everything
int readCount = 0;
listTask[10] = Task.Run(async () =>
while (await _dataChannel.Reader.WaitToReadAsync())
await Task.Delay(100);
var data = await _dataChannel.Reader.ReadAsync();
Console.WriteLine($"READ: Position ID: {data.ID}, Postion Name: {data.Name}");
await Task.WhenAll(listTask);
//Sanity check
Console.WriteLine($"Read {readCount} position data");
I can confirm channel close, and 100 items read.