Search code examples
pythonarraysnumpyfortranf2py

Why is univariate Horner in Fortran faster than NumPy counterpart while bivariate Horner is not


I want to perform polynomial calculus in Python. The polynomial package in numpy is not fast enough for me. Therefore I decided to rewrite several functions in Fortran and use f2py to create shared libraries which are easily imported into Python. Currently I am benchmarking my routines for univariate and bivariate polynomial evaluation against their numpy counterparts.

In the univariate routine I use Horner's method as does numpy.polynomial.polynomial.polyval. I have observed that the factor by which the Fortran routine is faster than the numpy counterpart increases as the order of the polynomial increases.

In the bivariate routine I use Horner's method twice. First in y and then in x. Unfortunately I have observed that for increasing polynomial order, the numpy counterpart catches up and eventually surpasses my Fortran routine. As numpy.polynomial.polynomial.polyval2d uses an approach similar to mine, I consider this second observation to be strange.

I am hoping that this result stems forth from my inexperience with Fortran and f2py. Might someone have any clue why the univariate routine always appears superior, while the bivariate routine is only superior for low order polynomials?

EDIT Here is my latest updated code, benchmark script and performance plots:

polynomial.f95

subroutine polyval(p, x, pval, nx)

    implicit none

    real(8), dimension(nx), intent(in) :: p
    real(8), intent(in) :: x
    real(8), intent(out) :: pval
    integer, intent(in) :: nx
    integer :: i

    pval = 0.0d0
    do i = nx, 1, -1
        pval = pval*x + p(i)
    end do

end subroutine polyval

subroutine polyval2(p, x, y, pval, nx, ny)

    implicit none

    real(8), dimension(nx, ny), intent(in) :: p
    real(8), intent(in) :: x, y
    real(8), intent(out) :: pval
    integer, intent(in) :: nx, ny
    real(8) :: tmp
    integer :: i, j

    pval = 0.0d0
    do j = ny, 1, -1
        tmp = 0.0d0
        do i = nx, 1, -1
            tmp = tmp*x + p(i, j)
        end do
        pval = pval*y + tmp
    end do

end subroutine polyval2

subroutine polyval3(p, x, y, z, pval, nx, ny, nz)

    implicit none

    real(8), dimension(nx, ny, nz), intent(in) :: p
    real(8), intent(in) :: x, y, z
    real(8), intent(out) :: pval
    integer, intent(in) :: nx, ny, nz
    real(8) :: tmp, tmp2
    integer :: i, j, k

    pval = 0.0d0
    do k = nz, 1, -1
        tmp2 = 0.0d0
        do j = ny, 1, -1
            tmp = 0.0d0
            do i = nx, 1, -1
                tmp = tmp*x + p(i, j, k)
            end do
            tmp2 = tmp2*y + tmp
        end do
        pval = pval*z + tmp2
    end do

end subroutine polyval3

benchmark.py (use this script to produce plots)

import time
import os

import numpy as np
import matplotlib.pyplot as plt

# Compile and import Fortran module
os.system('f2py -c polynomial.f95 --opt="-O3 -ffast-math" \
--f90exec="gfortran-4.8" -m polynomial')
import polynomial

# Create random x and y value
x = np.random.rand()
y = np.random.rand()
z = np.random.rand()

# Number of repetition
repetition = 10

# Number of times to loop over a function
run = 100

# Number of data points
points = 26

# Max number of coefficients for univariate case
n_uni_min = 4
n_uni_max = 100

# Max number of coefficients for bivariate case
n_bi_min = 4
n_bi_max = 100

# Max number of coefficients for trivariate case
n_tri_min = 4
n_tri_max = 100

# Case on/off switch
case_on = [1, 1, 1]

case_1_done = 0
case_2_done = 0
case_3_done = 0

#=================#
# UNIVARIATE CASE #
#=================#

if case_on[0]:

    # Array containing the polynomial order + 1 for several univariate polynomials
    n_uni = np.array([int(x) for x in np.linspace(n_uni_min, n_uni_max, points)])

    # Initialise arrays for storing timing results
    time_uni_numpy = np.zeros(n_uni.size)
    time_uni_fortran = np.zeros(n_uni.size)

    for i in xrange(len(n_uni)):
        # Create random univariate polynomial of order n - 1
        p = np.random.rand(n_uni[i])

        # Time evaluation of polynomial using NumPy
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): np.polynomial.polynomial.polyval(x, p)
            t2 = time.time()
            dt.append(t2 - t1)
        time_uni_numpy[i] = np.average(dt[2::])

        # Time evaluation of polynomial using Fortran
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): polynomial.polyval(p, x)
            t2 = time.time()
            dt.append(t2 - t1)
        time_uni_fortran[i] = np.average(dt[2::])

    # Speed-up factor
    factor_uni = time_uni_numpy / time_uni_fortran

    results_uni = np.zeros([len(n_uni), 4])
    results_uni[:, 0] = n_uni
    results_uni[:, 1] = factor_uni
    results_uni[:, 2] = time_uni_numpy
    results_uni[:, 3] = time_uni_fortran
    print results_uni, '\n'

    plt.figure()
    plt.plot(n_uni, factor_uni)
    plt.title('Univariate comparison')
    plt.xlabel('# coefficients')
    plt.ylabel('Speed-up factor')
    plt.xlim(n_uni[0], n_uni[-1])
    plt.ylim(0, max(factor_uni))
    plt.grid(aa=True)

case_1_done = 1

#================#
# BIVARIATE CASE #
#================#

if case_on[1]:

    # Array containing the polynomial order + 1 for several bivariate polynomials
    n_bi = np.array([int(x) for x in np.linspace(n_bi_min, n_bi_max, points)])

    # Initialise arrays for storing timing results
    time_bi_numpy = np.zeros(n_bi.size)
    time_bi_fortran = np.zeros(n_bi.size)

    for i in xrange(len(n_bi)):
        # Create random bivariate polynomial of order n - 1 in x and in y
        p = np.random.rand(n_bi[i], n_bi[i])

        # Time evaluation of polynomial using NumPy
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): np.polynomial.polynomial.polyval2d(x, y, p)
            t2 = time.time()
            dt.append(t2 - t1)
        time_bi_numpy[i] = np.average(dt[2::])

        # Time evaluation of polynomial using Fortran
        p = np.asfortranarray(p)
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): polynomial.polyval2(p, x, y)
            t2 = time.time()
            dt.append(t2 - t1)
        time_bi_fortran[i] = np.average(dt[2::])

    # Speed-up factor
    factor_bi = time_bi_numpy / time_bi_fortran

    results_bi = np.zeros([len(n_bi), 4])
    results_bi[:, 0] = n_bi
    results_bi[:, 1] = factor_bi
    results_bi[:, 2] = time_bi_numpy
    results_bi[:, 3] = time_bi_fortran
    print results_bi, '\n'

    plt.figure()
    plt.plot(n_bi, factor_bi)
    plt.title('Bivariate comparison')
    plt.xlabel('# coefficients')
    plt.ylabel('Speed-up factor')
    plt.xlim(n_bi[0], n_bi[-1])
    plt.ylim(0, max(factor_bi))
    plt.grid(aa=True)

case_2_done = 1

#=================#
# TRIVARIATE CASE #
#=================#

if case_on[2]:

    # Array containing the polynomial order + 1 for several bivariate polynomials
    n_tri = np.array([int(x) for x in np.linspace(n_tri_min, n_tri_max, points)])

    # Initialise arrays for storing timing results
    time_tri_numpy = np.zeros(n_tri.size)
    time_tri_fortran = np.zeros(n_tri.size)

    for i in xrange(len(n_tri)):
        # Create random bivariate polynomial of order n - 1 in x and in y
        p = np.random.rand(n_tri[i], n_tri[i])

        # Time evaluation of polynomial using NumPy
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): np.polynomial.polynomial.polyval3d(x, y, z, p)
            t2 = time.time()
            dt.append(t2 - t1)
        time_tri_numpy[i] = np.average(dt[2::])

        # Time evaluation of polynomial using Fortran
        p = np.asfortranarray(p)
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): polynomial.polyval3(p, x, y, z)
            t2 = time.time()
            dt.append(t2 - t1)
        time_tri_fortran[i] = np.average(dt[2::])

    # Speed-up factor
    factor_tri = time_tri_numpy / time_tri_fortran

    results_tri = np.zeros([len(n_tri), 4])
    results_tri[:, 0] = n_tri
    results_tri[:, 1] = factor_tri
    results_tri[:, 2] = time_tri_numpy
    results_tri[:, 3] = time_tri_fortran
    print results_tri

    plt.figure()
    plt.plot(n_bi, factor_bi)
    plt.title('Trivariate comparison')
    plt.xlabel('# coefficients')
    plt.ylabel('Speed-up factor')
    plt.xlim(n_tri[0], n_tri[-1])
    plt.ylim(0, max(factor_tri))
    plt.grid(aa=True)
    print '\n'

case_3_done = 1

#==============================================================================

plt.show()

Results enter image description here enter image description here enter image description here

EDIT correction to the proposal of steabert

subroutine polyval(p, x, pval, nx)

    implicit none

    real*8, dimension(nx), intent(in) :: p
    real*8, intent(in) :: x
    real*8, intent(out) :: pval
    integer, intent(in) :: nx

    integer, parameter :: simd = 8
    real*8 :: tmp(simd), xpower(simd), maxpower
    integer :: i, j, k

    xpower(1) = x
    do i = 2, simd
        xpower(i) = xpower(i-1)*x
    end do
    maxpower = xpower(simd)

    tmp = 0.0d0
    do i = nx+1, simd+2, -simd
        do j = 1, simd
            tmp(j) = tmp(j)*maxpower + p(i-j)*xpower(simd-j+1)
        end do
    end do

    k = mod(nx-1, simd)
    if (k == 0) then
        pval = sum(tmp) + p(1)
    else
        pval = sum(tmp) + p(k+1)
        do i = k, 1, -1
            pval = pval*x + p(i)
        end do
    end if

end subroutine polyval

EDIT Test code to verify that the code directly above gives poor results for x > 1

import polynomial as P
import numpy.polynomial.polynomial as PP

import numpy as np

for n in xrange(2,100):
    poly1n = np.random.rand(n)
    poly1f = np.asfortranarray(poly1n)

    x = 2

    print np.linalg.norm(P.polyval(poly1f, x) - PP.polyval(x, poly1n)), '\n'

Solution

  • Following the other suggestions, using p=np.asfortranarray(p) before the timer indeed puts the performance on par with numpy when I tested it. I extended the range for the bivariate bench to n_bi = np.array([2**i for i in xrange(1, 15)]), so that the p matrix would be larger than my L3 cache size.

    To further optimize this, I don't think automatic compiler options will be much help, since the inner loop has a dependency. Only if you manually unroll it, does ifort vectorize the innermost loop. With gfortran, -O3 and -ffast-math were needed. For matrix sizes limited by main memory bandwidth, this increase the performance benefit over numpy from a factor of 1 to 3.

    Update: after applying this also to the univariate code and compiling with f2py --opt='-O3 -ffast-math' -c -m polynomial polynomial.f90, I get the following for the source and results for benchmark.py:

    subroutine polyval(p, x, pval, nx)
    
    implicit none
    
    real*8, dimension(nx), intent(in) :: p
    real*8, intent(in) :: x
    real*8, intent(out) :: pval
    integer, intent(in) :: nx
    
    integer, parameter :: simd = 8
    real*8 :: tmp(simd), vecx(simd), xfactor
    integer :: i, j, k
    
    ! precompute factors
    do i = 1, simd
        vecx(i)=x**(i-1)
    end do
    xfactor = x**simd
    
    tmp = 0.0d0
    do i = 1, nx, simd
        do k = 1, simd
            tmp(k) = tmp(k)*xfactor + p(nx-(i+k-1)+1)*vecx(simd-k+1)
        end do
    end do
    pval = sum(tmp)
    
    
    end subroutine polyval
    
    subroutine polyval2(p, x, y, pval, nx, ny)
    
    implicit none
    
    real*8, dimension(nx, ny), intent(in) :: p
    real*8, intent(in) :: x, y
    real*8, intent(out) :: pval
    integer, intent(in) :: nx, ny
    
    integer, parameter :: simd = 8
    real*8 :: tmp(simd), vecx(simd), xfactor
    integer :: i, j, k
    
    ! precompute factors
    do i = 1, simd
        vecx(i)=x**(i-1)
    end do
    xfactor = x**simd
    
    ! horner
    pval=0.0d0
    do i = 1, ny
        tmp = 0.0d0
        do j = 1, nx, simd
            ! inner vectorizable loop
            do k = 1, simd
                tmp(k) = tmp(k)*xfactor + p(nx-(j+k-1)+1,ny-i+1)*vecx(simd-k+1)
            end do
        end do
        pval = pval*y + sum(tmp)
    end do
    
    end subroutine polyval2
    

    Update 2: As pointed out, this code is not correct, at least when sizes are not divisible by simd. It's just showing the concept of manually helping the compiler, so don't just use it like this. If the sizes are not powers of two, a small remainder loop has to take care of the dangling indices. It's not so difficult to do this, here is the correct procedure for the univariate case, should be straightforward to extend it to bivariate:

    subroutine polyval(p, x, pval, nx)
    implicit none
    
    real*8, dimension(nx), intent(in) :: p
    real*8, intent(in) :: x
    real*8, intent(out) :: pval
    integer, intent(in) :: nx
    
    integer, parameter :: simd = 4
    real*8 :: tmp(simd), vecx(simd), xfactor
    integer :: i, j, k, nr
    
    ! precompute factors
    do i = 1, simd
        vecx(i)=x**(i-1)
    end do
    xfactor = x**simd
    
    ! check remainder
    nr = mod(nx, simd)
    
    ! horner
    tmp = 0.0d0
    do i = 1, nx-nr, simd
        do k = 1, simd
            tmp(k) = tmp(k)*xfactor + p(nx-(i+k-1)+1)*vecx(simd-k+1)
        end do
    end do
    pval = sum(tmp)
    
    ! do remainder
    pval = pval * x**nr
    do i = 1, nr
        pval = pval + p(i) * vecx(i)
    end do
    end subroutine polyval
    

    univariate

    bivariate

    Also, one should be careful with very small sizes, as the time will be too small to have an accurate performance profile. Also, relative times with respect to numpy could be deceiving, as the absolute time with numpy could be very bad. So below are timings for the largest case:

    For univariate with nx=220, time is 1.21 s for numpy, and 1.69e-3 s for the custom fortran version. For bivariate with nxny=220, time is 8e-3 s for numpy, and 1.68e-3 s for the custom version. The fact that the time for both univariate and bivariate is the same when the total nxny size is the same is very important, as it supports the fact that the code is performing near the memory bandwidth limit.

    Update 3: with the new python script for smaller sizes, and simd=4 I get the following performance:

    enter image description here

    enter image description here

    Update 4: As for correctness, the results are the same within double precision accuracy, which you can see if you run this python code for the univariate example:

    import polynomial as P
    import numpy.polynomial.polynomial as PP
    
    import numpy as np
    
    for n in xrange(2,100):
        poly1n = np.random.rand(n)
        poly1f = np.asfortranarray(poly1n)
    
        x = 2
    
        print "%18.14e" % P.polyval(poly1f, x)
        print "%18.14e" % PP.polyval(x, poly1n)
        print (P.polyval(poly1f, x) - PP.polyval(x, poly1n))/PP.polyval(x,poly1n), '\n'