I am new in programming world. i am doing my graduation and also learning dotnet.
I want to iterate my list in parallel foreach but i want to use partition there. I have lack of knowledge so my code is not compiling.
Actually this way i did it first which is working.
Parallel.ForEach(MyBroker, broker =>,,
{
mybrow = new WeightageRowNumber();
mybrow.RowNumber = Interlocked.Increment(ref rowNumber);
lock (_lock)
{
Mylist.Add(mybrow);
}
});
now i want to use partition so i change my code this way but now my code not compiling. here is code
Parallel.ForEach(MyBroker, broker,
(j, loop, subtotal) =>
{
mybrow = new WeightageRowNumber();
mybrow.RowNumber = Interlocked.Increment(ref rowNumber);
lock (_lock)
{
Mylist.Add(mybrow);
}
return brokerRowWeightageRowNumber.RowNumber;
},
(finalResult) =>
var rownum= Interlocked.Increment(ref finalResult);
console.writeline(rownum);
);
please see my second set of code and show me how to restructure to use partition for parallel foreach to iterate my list.
please guide me. thanks
The Parallel.ForEach method has 20 overloads - perhaps try a different overload?
Without your dependencies included I can't give a 1-to-1 example on your implementation but here is an in-depth example (reformatted from here) that you can copy into your IDE and set debug breakpoints (if that's useful). Unfortunately building an instantiable overload of OrderablePartitioner appears non-trivial so sorry for all the boilerplate code:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections;
using System.Linq;
// Simple partitioner that will extract one (index,item) pair at a time,
// in a thread-safe fashion, from the underlying collection.
class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
{
// The collection being wrapped by this Partitioner
IEnumerable<T> m_referenceEnumerable;
// Class used to wrap m_index for the purpose of sharing access to it
// between an InternalEnumerable and multiple InternalEnumerators
private class Shared<U>
{
internal U Value;
public Shared(U item)
{
Value = item;
}
}
// Internal class that serves as a shared enumerable for the
// underlying collection.
private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
{
IEnumerator<T> m_reader;
bool m_disposed = false;
Shared<long> m_index = null;
// These two are used to implement Dispose() when static partitioning is being performed
int m_activeEnumerators;
bool m_downcountEnumerators;
// "downcountEnumerators" will be true for static partitioning, false for
// dynamic partitioning.
public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
{
m_reader = reader;
m_index = new Shared<long>(0);
m_activeEnumerators = 0;
m_downcountEnumerators = downcountEnumerators;
}
public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
{
if (m_disposed)
throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");
// For static partitioning, keep track of the number of active enumerators.
if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);
return new InternalEnumerator(m_reader, this, m_index);
}
IEnumerator<KeyValuePair<long, T>> IEnumerable<KeyValuePair<long, T>>.GetEnumerator()
{
return this.GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
// Only dispose the source enumerator if you are doing dynamic partitioning
if (!m_downcountEnumerators)
{
m_reader.Dispose();
}
m_disposed = true;
}
}
// Called from Dispose() method of spawned InternalEnumerator. During
// static partitioning, the source enumerator will be automatically
// disposed once all requested InternalEnumerators have been disposed.
public void DisposeEnumerator()
{
if (m_downcountEnumerators)
{
if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
{
m_reader.Dispose();
}
}
}
IEnumerator IEnumerable.GetEnumerator()
{
throw new NotImplementedException();
}
}
// Internal class that serves as a shared enumerator for
// the underlying collection.
private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
{
KeyValuePair<long, T> m_current;
IEnumerator<T> m_source;
InternalEnumerable m_controllingEnumerable;
Shared<long> m_index = null;
bool m_disposed = false;
public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
{
m_source = source;
m_current = default(KeyValuePair<long, T>);
m_controllingEnumerable = controllingEnumerable;
m_index = index;
}
object IEnumerator.Current
{
get { return m_current; }
}
KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
{
get { return m_current; }
}
void IEnumerator.Reset()
{
throw new NotSupportedException("Reset() not supported");
}
// This method is the crux of this class. Under lock, it calls
// MoveNext() on the underlying enumerator, grabs Current and index,
// and increments the index.
bool IEnumerator.MoveNext()
{
bool rval = false;
lock (m_source)
{
rval = m_source.MoveNext();
if (rval)
{
m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
m_index.Value = m_index.Value + 1;
}
else m_current = default(KeyValuePair<long, T>);
}
return rval;
}
void IDisposable.Dispose()
{
if (!m_disposed)
{
// Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator();
m_disposed = true;
}
}
}
// Constructor just grabs the collection to wrap
public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
: base(true, true, true)
{
// Verify that the source IEnumerable is not null
if (enumerable == null)
throw new ArgumentNullException("enumerable");
m_referenceEnumerable = enumerable;
}
// Produces a list of "numPartitions" IEnumerators that can each be
// used to traverse the underlying collection in a thread-safe manner.
// This will return a static number of enumerators, as opposed to
// GetOrderableDynamicPartitions(), the result of which can be used to produce
// any number of enumerators.
public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
{
if (numPartitions < 1)
throw new ArgumentOutOfRangeException("NumPartitions");
List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);
// Since we are doing static partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
// are disposed, dynamicPartitions will be disposed.
var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
for (int i = 0; i < numPartitions; i++)
list.Add(dynamicPartitions.GetEnumerator());
return list;
}
// Returns an instance of our internal Enumerable class. GetEnumerator()
// can then be called on that (multiple times) to produce shared enumerators.
public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
{
// Since we are doing dynamic partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
// will need to be explicitly disposed.
return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
}
// Must be set to true if GetDynamicPartitions() is supported.
public override bool SupportsDynamicPartitions
{
get { return true; }
}
}
Here are examples of how to structure Parallel.ForEach using the above OrderablePartitioner. See how you can refactor-out your finally-block entirely out of the ForEach impl?
public class Program
{
static void Main(string[] args)
{
//
// First a fairly simple visual test
//
var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
{
Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
});
//
// Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
//
List<int> src = Enumerable.Range(0, 100000).ToList();
SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);
int counter = 0;
bool mismatch = false;
Parallel.ForEach(myOP, (item, state, index) =>
{
if (item != index) mismatch = true;
Interlocked.Increment(ref counter);
});
if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");
Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);
}
}
Also this link might be useful ("Write a simple parallel.ForEach Loop")