Search code examples
c#arraysperformancesubtractionushort

How to quickly subtract one ushort array from another in C#?


I need to quickly subtract each value in ushort arrayA from the corresponding index value in ushort arrayB which has an identical length.

In addition, if the difference is negative, I need to store a zero, not the negative difference.

(Length = 327680 to be exact, since I'm subtracting a 640x512 image from another image of identical size).

The code below is currently taking ~20ms and I'd like to get it down under ~5ms if possible. Unsafe code is ok, but please provide an example, as I'm not super-skilled at writing unsafe code.

Thank you!

public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}

UPDATE: While it's not strictly C#, for the benefit of others who read this, I finally ended up adding a C++ CLR Class Library to my solution with the following code. It runs in ~3.1ms. If an unmanaged C++ library is used, it runs in ~2.2ms. I decided to go with the managed library since the time difference was small.

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}

Then I call it like this:

    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }

Solution

  • Some benchmarks.

    1. SubtractBackgroundFromBuffer: this is the original method from the question.
    2. SubtractBackgroundFromBufferWithCalcOpt: this is the original method augmented with TTat's idea for improving the calculation speed.
    3. SubtractBackgroundFromBufferParallelFor: the solution from Selman22's answer.
    4. SubtractBackgroundFromBufferBlockParallelFor: my answer. Similar to 3., but breaks the processing up into blocks of 4096 values.
    5. SubtractBackgroundFromBufferPartitionedParallelForEach: Geoff's first answer.
    6. SubtractBackgroundFromBufferPartitionedParallelForEachHack: Geoff's second answer.

    Updates

    Interestingly, I can get a small speed increase (~6%) for SubtractBackgroundFromBufferBlockParallelFor by using (as suggested by Bruno Costa)

    Buffer[i] = (ushort)Math.Max(difference, 0);
    

    instead of

    if (difference >= 0)
        Buffer[i] = (ushort)difference;
    else
        Buffer[i] = 0;
    

    Results

    Note that this is the total time for the 1000 iterations in each run.

    SubtractBackgroundFromBuffer(ms):                                 2,062.23 
    SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
    SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
    SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
    SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
    SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60
    

    So it seems from those results that the best approach combines the calculation optimizations for a small gain and the makes use of Parallel.For to operate on chunks of the image. Your mileage will of course vary, and performance of parallel code is sensitive to the CPU you are running.

    Test Harness

    I ran this for each method in Release mode. I am starting and stopping the Stopwatch this way to ensure only processing time is measured.

    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);
    
    for (int i = 0; i < 1000; i++)
    {
        Buffer = GenerateRandomBuffer(327680, 128011992);                
    
        sw.Start();
        SubtractBackgroundFromBuffer(bgImg);
        sw.Stop();
    }
    
    Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    
    
    public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
    {
        ushort[] buffer = new ushort[327680];
        Random random = new Random(randomSeed);
    
        for (int i = 0; i < size; i++)
        {
            buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
        }
    
        return buffer;
    }
    

    The Methods

    public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        int bufferLength = Buffer.Length;
    
        for (int index = 0; index < bufferLength; index++)
        {
            int difference = Buffer[index] - backgroundBuffer[index];
    
            if (difference >= 0)
                Buffer[index] = (ushort)difference;
            else
                Buffer[index] = 0;
        }
    }
    
    public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
    {
        int bufferLength = Buffer.Length;
    
        for (int index = 0; index < bufferLength; index++)
        {
            if (Buffer[index] < backgroundBuffer[index])
            {
                Buffer[index] = 0;
            }
            else
            {
                Buffer[index] -= backgroundBuffer[index];
            }
        }
    }
    
    public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
    {
        Parallel.For(0, Buffer.Length, (i) =>
        {
            int difference = Buffer[i] - backgroundBuffer[i];
            if (difference >= 0)
                Buffer[i] = (ushort)difference;
            else
                Buffer[i] = 0;
        });
    }        
    
    public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
    {
        int blockSize = 4096;
    
        Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
        {
            for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
            {
                int difference = Buffer[i] - backgroundBuffer[i];
    
                Buffer[i] = (ushort)Math.Max(difference, 0);                    
            }
        });
    }
    
    public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
    {
        Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
            {
                for (int i = range.Item1; i < range.Item2; ++i)
                {
                    if (Buffer[i] < backgroundBuffer[i])
                    {
                        Buffer[i] = 0;
                    }
                    else
                    {
                        Buffer[i] -= backgroundBuffer[i];
                    }
                }
            });
    }
    
    public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
    {
        Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
    }