Search code examples
c#parallel-processingcancellationtokensource

Parallel.ForEach CancellationTokenSource not Stopping


I'm currently writing a ProxyChecker library. I'm using a Thread that funs a Parallel.ForEach loop to check all proxies. I'm using a CancellationTokenSource (cts) to make a soft abort (with cts.Cancel()). As u can see in the following code I added a little "test code" which writes the current Threads to the Console.

Here is the code u need:

private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
        {
            _cts = new CancellationTokenSource();
            int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
            mainThread = new Thread(() =>
            {
                try
                {
                    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
                    {
                        Interlocked.Increment(ref running);
                        Console.WriteLine("thread running: {0}", running);
                        try
                        {
                            _cts.Token.ThrowIfCancellationRequested();
                            if (CheckProxy(prox, domainToCheckWith, timeout))
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Increment(ref goodProxies);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                            }
                            else
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                                Interlocked.Increment(ref badProxies);
                            }
                            _cts.Token.ThrowIfCancellationRequested();
                            OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
                        }
                        catch (OperationCanceledException ex) {}
                        catch (ObjectDisposedException ex) {}
                        catch (Exception ex)
                        {
                            OnLog(ex.Message, Color.Red);
                        }
                        finally
                        {
                            Console.WriteLine("thread running: {0}", running);
                            Interlocked.Decrement(ref running);
                        }
                    });
                }
                catch (OperationCanceledException ex) {}
                catch (ObjectDisposedException ex) {}
                catch (Exception ex)
                {
                    OnLog(ex.Message, Color.Red);
                }
                finally
                {
                    isRunning = false;
                    OnComplete();
                }
            });
            mainThread.Start();
        }

The output (I took out a few lines since it's useless to give u the full code)

thread running: 1
thread running: 1
thread running: 2
thread running: 2

//Slowly going up to  50

thread running: 50
thread running: 50
thread running: 50

//Staying at 50 till I press stop

thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46

//Going down...

thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4

And then it stops at 4 or 3 or 2 (different each time). I waited a few minutes, but it didn't go down nor the code after the Parallel.ForEach gets executed.

The timeout for the request is 5000, the threads are 50.

Heres the other code for the checking:

private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
    try
    {
        WebRequest req = WebRequest.Create(domainToCheckWith);
        req.Proxy = new WebProxy(proxy);
        req.Timeout = timeout;
        var response = (HttpWebResponse) req.GetResponse();
        string responseString = ReadResponseString(response);

        if (responseString.Contains("SOMETHING HERE"))
        {
            OnGoodProxy(proxy);
            return true;
        }
        if (responseString.Contains("SOMEOTHERTHINGHERE"))
        {
            OnBadProxy(proxy);
            return false;
        }
        OnBadProxy(proxy);
        return false;
    }
    catch (WebException ex)
    {
        OnBadProxy(proxy);
        return false;
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
        return false;
    }
}

Stop function:

public void StopChecking()
{
    try
    {
        if (_cts != null && mainThread.IsAlive)
        {
            if (_cts.IsCancellationRequested)
            {
                mainThread.Abort();
                OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
                while (mainThread.IsAlive) ;
                OnComplete();
                isRunning = false;
            }
            else
            {
                _cts.Cancel();
                OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
            }
        }
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
    }
}

IMPORTANT EDIT:

I added this to the CeckProxy function:

        Stopwatch sw = new Stopwatch();
        sw.Start();
        string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
        sw.Stop();

This is the result of the last few threads:

thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3

why is this so long? I mean 180 seconds?!


Solution

  • Ok, I figured it out myself.

    I now read the response continuous and check with a stopwatch (and request.ReadWriteTimeout) that the reading part stops after a specific time (In my case readTimeout) is reached. Code

    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith);
                req.Proxy = new WebProxy(proxy);
                req.Timeout = timeout;
                req.ReadWriteTimeout = readTimeout;
                req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip");
                req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
    
                byte[] responseByte = new byte[1024];
                string responseString = string.Empty;
    
                sw.Start();
                using (WebResponse res = req.GetResponse())
                {
                    using (Stream stream = res.GetResponseStream())
                    {
                        while (stream.Read(responseByte, 0, responseByte.Length) > 0)
                        {
                            responseString += Encoding.UTF8.GetString(responseByte);
                            if(sw.ElapsedMilliseconds > (long)timeout)
                                throw new WebException();
                        }
    
                    }
                }
                sw.Stop();