Search code examples
python-3.xrapidscudf

TypingError in rapids cudf User Defined Function


I have a cudf df with Close and Date columns, where Close is float64 and Date is (%Y-%m-%d) datetime64.

I wanted to define a function that takes those columns as inputs and creates what is known as Market Profile, as Data is granular, in same Date there's a lot of Close's. Info would came out of function as 30 new columns, defining min, max & count for each range of price, that is updated on each row, until the day changes and values reset to restart again.

def profile(Close,Date,min1,min2,min3,min4,min5,min6,min7,min8,min9,min10,
            max1,max2,max3,max4,max5,max6,max7,max8,max9,max10, 
            count1,count2,count3,count4,count5,count6,count7,count8,count9,count10):
        values = []
        counters=[]
    
        for i, (price, date) in enumerate(zip(Close,Date)):
        
            if date != date.shift(-1):
                values = []
                counters=[]
        
            cl = int(price)
            values.append(cl)
            counters.append(cl.count())
        
            range = (max(values) - min(values))/10
    
            df = pd.DataFrame({'Value':values,'Index':counters})
        
            min1 = min(values)
            min2 = min1+range
            min3 = min2+range
            min4 = min3+range
            min5 = min4+range
            min6 = min5+range
            min7 = min6+range
            min8 = min7+range
            min9 = min8+range
            min10 = min9+range
    
            max1 = min2
            max2 = min3
            max3 = min4
            max4 = min5
            max5 = min6
            max6 = min7
            max7 = min8
            max8 = min9
            max9 = min10
            max10 = min10+range
    
            expr1 = 'Value >= @min1 and Value < @max1'
            expr2 = 'Value >= @min2 and Value < @max2'
            expr3 = 'Value >= @min3 and Value < @max3'
            expr4 = 'Value >= @min4 and Value < @max4'
            expr5 = 'Value >= @min5 and Value < @max5'
            expr6 = 'Value >= @min6 and Value < @max6'
            expr7 = 'Value >= @min7 and Value < @max7'
            expr8 = 'Value >= @min8 and Value < @max8'
            expr9 = 'Value >= @min9 and Value < @max9'
            expr10 = 'Value >= @min10 and Value < @max10'
    
            df1 = df.query(expr1)
            count1[i] = df1.Index.sum()
            df2 = df.query(expr2)
            count2[i] = df2.Index.sum()
            df3 = df.query(expr3)
            count3[i] = df3.Index.sum()
            df4 = df.query(expr4)
            count4[i] = df4.Index.sum()
            df5 = df.query(expr5)
            count5[i] = df5.Index.sum()
            df6 = df.query(expr6)
            count6[i] = df6.Index.sum()
            df7 = df.query(expr7)
            count7[i] = df7.Index.sum()
            df8 = df.query(expr8)
            count8[i] = df8.Index.sum()
            df9 = df.query(expr9)
            count9[i] = df9.Index.sum()
            df10 = df.query(expr10)
            count10[i] = df10.Index.sum()
        
            min1[i] = min1
            min2[i] = min2
            min3[i] = min3
            min4[i] = min4
            min5[i] = min5
            min6[i] = min6
            min7[i] = min7
            min8[i] = min8
            min9[i] = min9
            min10[i] = min10
    
            max1[i] = max1
            max2[i] = max2
            max3[i] = max3
            max4[i] = max4
            max5[i] = max5
            max6[i] = max6
            max7[i] = max7
            max8[i] = max8
            max9[i] = max9
            max10[i] = max10

Function was applied as:

import numpy as np
df = df.apply_rows(profile,
                   incols={'Close':'Close', 'Date':'Date'},
                   outcols={'min1':np.float64, 'max1':np.float64, 'count1':np.int16,
                            'min2':np.float64, 'max2':np.float64, 'count2':np.int16, 
                            'min3':np.float64, 'max3':np.float64, 'count3':np.int16, 
                            'min4':np.float64, 'max4':np.float64, 'count4':np.int16, 
                            'min5':np.float64, 'max5':np.float64, 'count5':np.int16, 
                            'min6':np.float64, 'max6':np.float64, 'count6':np.int16, 
                            'min7':np.float64, 'max7':np.float64, 'count7':np.int16, 
                            'min8':np.float64, 'max8':np.float64, 'count8':np.int16, 
                            'min9':np.float64, 'max9':np.float64, 'count9':np.int16, 
                            'min10':np.float64, 'max10':np.float64, 'count10':np.int16},
                   kwargs={}
                  )

It returns the error :

TypingError                               Traceback (most recent call last)
Input In [30], in <cell line: 2>()
      1 import numpy as np
----> 2 df = df.apply_rows(profile,
      3                    incols={'Close':'Close', 'Date':'Date'},
      4                    outcols={'min1':np.float64, 'max1':np.float64, 'count1':np.int16,
      5                             'min2':np.float64, 'max2':np.float64, 'count2':np.int16, 
      6                             'min3':np.float64, 'max3':np.float64, 'count3':np.int16, 
      7                             'min4':np.float64, 'max4':np.float64, 'count4':np.int16, 
      8                             'min5':np.float64, 'max5':np.float64, 'count5':np.int16, 
      9                             'min6':np.float64, 'max6':np.float64, 'count6':np.int16, 
     10                             'min7':np.float64, 'max7':np.float64, 'count7':np.int16, 
     11                             'min8':np.float64, 'max8':np.float64, 'count8':np.int16, 
     12                             'min9':np.float64, 'max9':np.float64, 'count9':np.int16, 
     13                             'min10':np.float64, 'max10':np.float64, 'count10':np.int16},
     14                    kwargs={}
     15                   )

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/core/dataframe.py:4002, in DataFrame.apply_rows(self, func, incols, outcols, kwargs, pessimistic_nulls, cache_key)
   3995     if is_string_dtype(current_col_dtype) or is_categorical_dtype(
   3996         current_col_dtype
   3997     ):
   3998         raise TypeError(
   3999             "User defined functions are currently not "
   4000             "supported on Series with dtypes `str` and `category`."
   4001         )
-> 4002 return applyutils.apply_rows(
   4003     self,
   4004     func,
   4005     incols,
   4006     outcols,
   4007     kwargs,
   4008     pessimistic_nulls,
   4009     cache_key=cache_key,
   4010 )

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/utils/applyutils.py:78, in apply_rows(df, func, incols, outcols, kwargs, pessimistic_nulls, cache_key)
     69 """Row-wise transformation
     70 
     71 Parameters
     72 ----------
     73 {params}
     74 """
     75 applyrows = ApplyRowsCompiler(
     76     func, incols, outcols, kwargs, pessimistic_nulls, cache_key=cache_key
     77 )
---> 78 return applyrows.run(df)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/utils/applyutils.py:163, in ApplyKernelCompilerBase.run(self, df, **launch_params)
    161 bound = self.sig.bind(**args)
    162 # Launch kernel
--> 163 self.launch_kernel(df, bound.args, **launch_params)
    164 # Prepare pessimistic nullmask
    165 if self.pessimistic_nulls:

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/utils/applyutils.py:190, in ApplyRowsCompiler.launch_kernel(self, df, args)
    189 def launch_kernel(self, df, args):
--> 190     self.kernel.forall(len(df))(*args)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:336, in ForAll.__call__(self, *args)
    334     kernel = self.kernel
    335 else:
--> 336     kernel = self.kernel.specialize(*args)
    337 blockdim = self._compute_thread_per_block(kernel)
    338 griddim = (self.ntasks + blockdim - 1) // blockdim

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:974, in Dispatcher.specialize(self, *args)
    972 targetoptions = self.targetoptions
    973 targetoptions['link'] = self.link
--> 974 specialization = Dispatcher(self.py_func, [types.void(*argtypes)],
    975                             targetoptions)
    976 self.specializations[cc, argtypes] = specialization
    977 return specialization

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:807, in Dispatcher.__init__(self, py_func, sigs, targetoptions)
    805         self.compile_device(argtypes)
    806     else:
--> 807         self.compile(sigs[0])
    809     self._can_compile = False
    811 if targetoptions.get('device'):

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:1086, in Dispatcher.compile(self, sig)
   1084 if not self._can_compile:
   1085     raise RuntimeError("Compilation disabled")
-> 1086 kernel = _Kernel(self.py_func, argtypes, link=self.link,
   1087                  **self.targetoptions)
   1088 # Inspired by _DispatcherBase.add_overload, but differs slightly
   1089 # because we're inserting a _Kernel object instead of a compiled
   1090 # function.
   1091 c_sig = [a._code for a in argtypes]

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
     32 @functools.wraps(func)
     33 def _acquire_compile_lock(*args, **kwargs):
     34     with self:
---> 35         return func(*args, **kwargs)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:392, in _Kernel.__init__(self, py_func, argtypes, link, debug, lineinfo, inline, fastmath, extensions, max_registers, opt, device)
    383 self.extensions = extensions or []
    385 nvvm_options = {
    386     'debug': self.debug,
    387     'lineinfo': self.lineinfo,
    388     'fastmath': fastmath,
    389     'opt': 3 if opt else 0
    390 }
--> 392 cres = compile_cuda(self.py_func, types.void, self.argtypes,
    393                     debug=self.debug,
    394                     lineinfo=self.lineinfo,
    395                     inline=inline,
    396                     fastmath=fastmath,
    397                     nvvm_options=nvvm_options)
    398 tgt_ctx = cres.target_context
    399 code = self.py_func.__code__

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
     32 @functools.wraps(func)
     33 def _acquire_compile_lock(*args, **kwargs):
     34     with self:
---> 35         return func(*args, **kwargs)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:202, in compile_cuda(pyfunc, return_type, args, debug, lineinfo, inline, fastmath, nvvm_options)
    199     flags.nvvm_options = nvvm_options
    201 # Run compilation pipeline
--> 202 cres = compiler.compile_extra(typingctx=typingctx,
    203                               targetctx=targetctx,
    204                               func=pyfunc,
    205                               args=args,
    206                               return_type=return_type,
    207                               flags=flags,
    208                               locals={},
    209                               pipeline_class=CUDACompiler)
    211 library = cres.library
    212 library.finalize()

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:693, in compile_extra(typingctx, targetctx, func, args, return_type, flags, locals, library, pipeline_class)
    669 """Compiler entry point
    670 
    671 Parameter
   (...)
    689     compiler pipeline
    690 """
    691 pipeline = pipeline_class(typingctx, targetctx, library,
    692                           args, return_type, flags, locals)
--> 693 return pipeline.compile_extra(func)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:429, in CompilerBase.compile_extra(self, func)
    427 self.state.lifted = ()
    428 self.state.lifted_from = None
--> 429 return self._compile_bytecode()

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:497, in CompilerBase._compile_bytecode(self)
    493 """
    494 Populate and run pipeline for bytecode input
    495 """
    496 assert self.state.func_ir is None
--> 497 return self._compile_core()

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:476, in CompilerBase._compile_core(self)
    474         self.state.status.fail_reason = e
    475         if is_final_pipeline:
--> 476             raise e
    477 else:
    478     raise CompilerError("All available pipelines exhausted")

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:463, in CompilerBase._compile_core(self)
    461 res = None
    462 try:
--> 463     pm.run(self.state)
    464     if self.state.cr is not None:
    465         break

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:353, in PassManager.run(self, state)
    350 msg = "Failed in %s mode pipeline (step: %s)" % \
    351     (self.pipeline_name, pass_desc)
    352 patched_exception = self._patch_error(msg, e)
--> 353 raise patched_exception

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:341, in PassManager.run(self, state)
    339 pass_inst = _pass_registry.get(pss).pass_inst
    340 if isinstance(pass_inst, CompilerPass):
--> 341     self._runPass(idx, pass_inst, state)
    342 else:
    343     raise BaseException("Legacy pass in use")

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
     32 @functools.wraps(func)
     33 def _acquire_compile_lock(*args, **kwargs):
     34     with self:
---> 35         return func(*args, **kwargs)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:296, in PassManager._runPass(self, index, pss, internal_state)
    294     mutated |= check(pss.run_initialization, internal_state)
    295 with SimpleTimer() as pass_time:
--> 296     mutated |= check(pss.run_pass, internal_state)
    297 with SimpleTimer() as finalize_time:
    298     mutated |= check(pss.run_finalizer, internal_state)

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:269, in PassManager._runPass.<locals>.check(func, compiler_state)
    268 def check(func, compiler_state):
--> 269     mangled = func(compiler_state)
    270     if mangled not in (True, False):
    271         msg = ("CompilerPass implementations should return True/False. "
    272                "CompilerPass with name '%s' did not.")

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/typed_passes.py:105, in BaseTypeInference.run_pass(self, state)
     99 """
    100 Type inference and legalization
    101 """
    102 with fallback_context(state, 'Function "%s" failed type inference'
    103                       % (state.func_id.func_name,)):
    104     # Type inference
--> 105     typemap, return_type, calltypes, errs = type_inference_stage(
    106         state.typingctx,
    107         state.targetctx,
    108         state.func_ir,
    109         state.args,
    110         state.return_type,
    111         state.locals,
    112         raise_errors=self._raise_errors)
    113     state.typemap = typemap
    114     # save errors in case of partial typing

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/typed_passes.py:83, in type_inference_stage(typingctx, targetctx, interp, args, return_type, locals, raise_errors)
     81     infer.build_constraint()
     82     # return errors in case of partial typing
---> 83     errs = infer.propagate(raise_errors=raise_errors)
     84     typemap, restype, calltypes = infer.unify(raise_errors=raise_errors)
     86 # Output all Numba warnings

File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/typeinfer.py:1086, in TypeInferer.propagate(self, raise_errors)
   1083 force_lit_args = [e for e in errors
   1084                   if isinstance(e, ForceLiteralArg)]
   1085 if not force_lit_args:
-> 1086     raise errors[0]
   1087 else:
   1088     raise reduce(operator.or_, force_lit_args)

TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
Unknown attribute 'shift' of type datetime64[ns]

File "../../../../../tmp/ipykernel_2389/2211239783.py", line 12:
<source missing, REPL/exec in use?>

During: typing of get attribute at /tmp/ipykernel_2389/2211239783.py (12)

File "../../../../../tmp/ipykernel_2389/2211239783.py", line 12:
<source missing, REPL/exec in use?>

During: resolving callee type: type(<numba.cuda.compiler.Dispatcher object at 0x7f6e11b652c0>)
During: typing of call at <string> (37)


File "<string>", line 37:
<source missing, REPL/exec in use?>

Anyone knows how I can perform a similar function to shift(-1) in this function? To compare if current Date is different from previous, to reset Counters and Values for each new day.

If someone thinks that have better solution for function would be great to hear.

Thank you

Trying with [i-1]:

if date != date[i-1]:


TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
No implementation of function Function(<built-in function getitem>) found for signature:
 
 >>> getitem(datetime64[ns], int64)
 
There are 22 candidate implementations:
      - Of which 22 did not match due to:
      Overload of function 'getitem': File: <numerous>: Line N/A.
        With argument(s): '(datetime64[ns], int64)':
       No match.

During: typing of intrinsic-call at /tmp/ipykernel_2311/51191800.py (12)

File "../../../../../tmp/ipykernel_2311/51191800.py", line 12:
<source missing, REPL/exec in use?>

During: resolving callee type: type(<numba.cuda.compiler.Dispatcher object at 0x7f5edc1f5b80>)
During: typing of call at <string> (37)


File "<string>", line 37:
<source missing, REPL/exec in use?>

Converted df date to int with:

df['Date'] = df['DateTime'].dt.strftime('%Y%m%d')
df['Date'] = cudf.to_numeric(df['Date'])

gives the error :

TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
Unknown attribute 'append' of type list(undefined)<iv=None>

File "../../../../../tmp/ipykernel_2338/3901380751.py", line 13:
<source missing, REPL/exec in use?>

During: typing of get attribute at /tmp/ipykernel_2338/3901380751.py (13)

File "../../../../../tmp/ipykernel_2338/3901380751.py", line 13:
<source missing, REPL/exec in use?>

During: resolving callee type: type(<numba.cuda.compiler.Dispatcher object at 0x7f861aaeda40>)
During: typing of call at <string> (37)


File "<string>", line 37:
<source missing, REPL/exec in use?>

Does it mean I need to do some change in the list creation to append?

Any help about the function would be highly valuable


Solution

    1. Looking at the docs, in nopython mode, it says "the reflection process can be expensive for large lists and it is not supported for lists that contain reflected data types. Users cannot use list-of-list as an argument because of this limitation." https://numba.pydata.org/numba-doc/dev/reference/pysupported.html#list-reflection
    2. You aren't applying the APIs in an efficient manner. The use of applies and for loops makes the code complex and fights against the code that allows this to be parallelly processed. I think you just want to groupby on dates for min, max, and count by price and then combine it into an aggregate df with your desired columns.