I have a cudf df with Close and Date columns, where Close is float64 and Date is (%Y-%m-%d) datetime64.
I wanted to define a function that takes those columns as inputs and creates what is known as Market Profile, as Data is granular, in same Date there's a lot of Close's. Info would came out of function as 30 new columns, defining min, max & count for each range of price, that is updated on each row, until the day changes and values reset to restart again.
def profile(Close,Date,min1,min2,min3,min4,min5,min6,min7,min8,min9,min10,
max1,max2,max3,max4,max5,max6,max7,max8,max9,max10,
count1,count2,count3,count4,count5,count6,count7,count8,count9,count10):
values = []
counters=[]
for i, (price, date) in enumerate(zip(Close,Date)):
if date != date.shift(-1):
values = []
counters=[]
cl = int(price)
values.append(cl)
counters.append(cl.count())
range = (max(values) - min(values))/10
df = pd.DataFrame({'Value':values,'Index':counters})
min1 = min(values)
min2 = min1+range
min3 = min2+range
min4 = min3+range
min5 = min4+range
min6 = min5+range
min7 = min6+range
min8 = min7+range
min9 = min8+range
min10 = min9+range
max1 = min2
max2 = min3
max3 = min4
max4 = min5
max5 = min6
max6 = min7
max7 = min8
max8 = min9
max9 = min10
max10 = min10+range
expr1 = 'Value >= @min1 and Value < @max1'
expr2 = 'Value >= @min2 and Value < @max2'
expr3 = 'Value >= @min3 and Value < @max3'
expr4 = 'Value >= @min4 and Value < @max4'
expr5 = 'Value >= @min5 and Value < @max5'
expr6 = 'Value >= @min6 and Value < @max6'
expr7 = 'Value >= @min7 and Value < @max7'
expr8 = 'Value >= @min8 and Value < @max8'
expr9 = 'Value >= @min9 and Value < @max9'
expr10 = 'Value >= @min10 and Value < @max10'
df1 = df.query(expr1)
count1[i] = df1.Index.sum()
df2 = df.query(expr2)
count2[i] = df2.Index.sum()
df3 = df.query(expr3)
count3[i] = df3.Index.sum()
df4 = df.query(expr4)
count4[i] = df4.Index.sum()
df5 = df.query(expr5)
count5[i] = df5.Index.sum()
df6 = df.query(expr6)
count6[i] = df6.Index.sum()
df7 = df.query(expr7)
count7[i] = df7.Index.sum()
df8 = df.query(expr8)
count8[i] = df8.Index.sum()
df9 = df.query(expr9)
count9[i] = df9.Index.sum()
df10 = df.query(expr10)
count10[i] = df10.Index.sum()
min1[i] = min1
min2[i] = min2
min3[i] = min3
min4[i] = min4
min5[i] = min5
min6[i] = min6
min7[i] = min7
min8[i] = min8
min9[i] = min9
min10[i] = min10
max1[i] = max1
max2[i] = max2
max3[i] = max3
max4[i] = max4
max5[i] = max5
max6[i] = max6
max7[i] = max7
max8[i] = max8
max9[i] = max9
max10[i] = max10
Function was applied as:
import numpy as np
df = df.apply_rows(profile,
incols={'Close':'Close', 'Date':'Date'},
outcols={'min1':np.float64, 'max1':np.float64, 'count1':np.int16,
'min2':np.float64, 'max2':np.float64, 'count2':np.int16,
'min3':np.float64, 'max3':np.float64, 'count3':np.int16,
'min4':np.float64, 'max4':np.float64, 'count4':np.int16,
'min5':np.float64, 'max5':np.float64, 'count5':np.int16,
'min6':np.float64, 'max6':np.float64, 'count6':np.int16,
'min7':np.float64, 'max7':np.float64, 'count7':np.int16,
'min8':np.float64, 'max8':np.float64, 'count8':np.int16,
'min9':np.float64, 'max9':np.float64, 'count9':np.int16,
'min10':np.float64, 'max10':np.float64, 'count10':np.int16},
kwargs={}
)
It returns the error :
TypingError Traceback (most recent call last)
Input In [30], in <cell line: 2>()
1 import numpy as np
----> 2 df = df.apply_rows(profile,
3 incols={'Close':'Close', 'Date':'Date'},
4 outcols={'min1':np.float64, 'max1':np.float64, 'count1':np.int16,
5 'min2':np.float64, 'max2':np.float64, 'count2':np.int16,
6 'min3':np.float64, 'max3':np.float64, 'count3':np.int16,
7 'min4':np.float64, 'max4':np.float64, 'count4':np.int16,
8 'min5':np.float64, 'max5':np.float64, 'count5':np.int16,
9 'min6':np.float64, 'max6':np.float64, 'count6':np.int16,
10 'min7':np.float64, 'max7':np.float64, 'count7':np.int16,
11 'min8':np.float64, 'max8':np.float64, 'count8':np.int16,
12 'min9':np.float64, 'max9':np.float64, 'count9':np.int16,
13 'min10':np.float64, 'max10':np.float64, 'count10':np.int16},
14 kwargs={}
15 )
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/core/dataframe.py:4002, in DataFrame.apply_rows(self, func, incols, outcols, kwargs, pessimistic_nulls, cache_key)
3995 if is_string_dtype(current_col_dtype) or is_categorical_dtype(
3996 current_col_dtype
3997 ):
3998 raise TypeError(
3999 "User defined functions are currently not "
4000 "supported on Series with dtypes `str` and `category`."
4001 )
-> 4002 return applyutils.apply_rows(
4003 self,
4004 func,
4005 incols,
4006 outcols,
4007 kwargs,
4008 pessimistic_nulls,
4009 cache_key=cache_key,
4010 )
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/utils/applyutils.py:78, in apply_rows(df, func, incols, outcols, kwargs, pessimistic_nulls, cache_key)
69 """Row-wise transformation
70
71 Parameters
72 ----------
73 {params}
74 """
75 applyrows = ApplyRowsCompiler(
76 func, incols, outcols, kwargs, pessimistic_nulls, cache_key=cache_key
77 )
---> 78 return applyrows.run(df)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/utils/applyutils.py:163, in ApplyKernelCompilerBase.run(self, df, **launch_params)
161 bound = self.sig.bind(**args)
162 # Launch kernel
--> 163 self.launch_kernel(df, bound.args, **launch_params)
164 # Prepare pessimistic nullmask
165 if self.pessimistic_nulls:
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/cudf/utils/applyutils.py:190, in ApplyRowsCompiler.launch_kernel(self, df, args)
189 def launch_kernel(self, df, args):
--> 190 self.kernel.forall(len(df))(*args)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:336, in ForAll.__call__(self, *args)
334 kernel = self.kernel
335 else:
--> 336 kernel = self.kernel.specialize(*args)
337 blockdim = self._compute_thread_per_block(kernel)
338 griddim = (self.ntasks + blockdim - 1) // blockdim
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:974, in Dispatcher.specialize(self, *args)
972 targetoptions = self.targetoptions
973 targetoptions['link'] = self.link
--> 974 specialization = Dispatcher(self.py_func, [types.void(*argtypes)],
975 targetoptions)
976 self.specializations[cc, argtypes] = specialization
977 return specialization
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:807, in Dispatcher.__init__(self, py_func, sigs, targetoptions)
805 self.compile_device(argtypes)
806 else:
--> 807 self.compile(sigs[0])
809 self._can_compile = False
811 if targetoptions.get('device'):
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:1086, in Dispatcher.compile(self, sig)
1084 if not self._can_compile:
1085 raise RuntimeError("Compilation disabled")
-> 1086 kernel = _Kernel(self.py_func, argtypes, link=self.link,
1087 **self.targetoptions)
1088 # Inspired by _DispatcherBase.add_overload, but differs slightly
1089 # because we're inserting a _Kernel object instead of a compiled
1090 # function.
1091 c_sig = [a._code for a in argtypes]
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
32 @functools.wraps(func)
33 def _acquire_compile_lock(*args, **kwargs):
34 with self:
---> 35 return func(*args, **kwargs)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:392, in _Kernel.__init__(self, py_func, argtypes, link, debug, lineinfo, inline, fastmath, extensions, max_registers, opt, device)
383 self.extensions = extensions or []
385 nvvm_options = {
386 'debug': self.debug,
387 'lineinfo': self.lineinfo,
388 'fastmath': fastmath,
389 'opt': 3 if opt else 0
390 }
--> 392 cres = compile_cuda(self.py_func, types.void, self.argtypes,
393 debug=self.debug,
394 lineinfo=self.lineinfo,
395 inline=inline,
396 fastmath=fastmath,
397 nvvm_options=nvvm_options)
398 tgt_ctx = cres.target_context
399 code = self.py_func.__code__
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
32 @functools.wraps(func)
33 def _acquire_compile_lock(*args, **kwargs):
34 with self:
---> 35 return func(*args, **kwargs)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/cuda/compiler.py:202, in compile_cuda(pyfunc, return_type, args, debug, lineinfo, inline, fastmath, nvvm_options)
199 flags.nvvm_options = nvvm_options
201 # Run compilation pipeline
--> 202 cres = compiler.compile_extra(typingctx=typingctx,
203 targetctx=targetctx,
204 func=pyfunc,
205 args=args,
206 return_type=return_type,
207 flags=flags,
208 locals={},
209 pipeline_class=CUDACompiler)
211 library = cres.library
212 library.finalize()
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:693, in compile_extra(typingctx, targetctx, func, args, return_type, flags, locals, library, pipeline_class)
669 """Compiler entry point
670
671 Parameter
(...)
689 compiler pipeline
690 """
691 pipeline = pipeline_class(typingctx, targetctx, library,
692 args, return_type, flags, locals)
--> 693 return pipeline.compile_extra(func)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:429, in CompilerBase.compile_extra(self, func)
427 self.state.lifted = ()
428 self.state.lifted_from = None
--> 429 return self._compile_bytecode()
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:497, in CompilerBase._compile_bytecode(self)
493 """
494 Populate and run pipeline for bytecode input
495 """
496 assert self.state.func_ir is None
--> 497 return self._compile_core()
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:476, in CompilerBase._compile_core(self)
474 self.state.status.fail_reason = e
475 if is_final_pipeline:
--> 476 raise e
477 else:
478 raise CompilerError("All available pipelines exhausted")
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler.py:463, in CompilerBase._compile_core(self)
461 res = None
462 try:
--> 463 pm.run(self.state)
464 if self.state.cr is not None:
465 break
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:353, in PassManager.run(self, state)
350 msg = "Failed in %s mode pipeline (step: %s)" % \
351 (self.pipeline_name, pass_desc)
352 patched_exception = self._patch_error(msg, e)
--> 353 raise patched_exception
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:341, in PassManager.run(self, state)
339 pass_inst = _pass_registry.get(pss).pass_inst
340 if isinstance(pass_inst, CompilerPass):
--> 341 self._runPass(idx, pass_inst, state)
342 else:
343 raise BaseException("Legacy pass in use")
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
32 @functools.wraps(func)
33 def _acquire_compile_lock(*args, **kwargs):
34 with self:
---> 35 return func(*args, **kwargs)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:296, in PassManager._runPass(self, index, pss, internal_state)
294 mutated |= check(pss.run_initialization, internal_state)
295 with SimpleTimer() as pass_time:
--> 296 mutated |= check(pss.run_pass, internal_state)
297 with SimpleTimer() as finalize_time:
298 mutated |= check(pss.run_finalizer, internal_state)
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/compiler_machinery.py:269, in PassManager._runPass.<locals>.check(func, compiler_state)
268 def check(func, compiler_state):
--> 269 mangled = func(compiler_state)
270 if mangled not in (True, False):
271 msg = ("CompilerPass implementations should return True/False. "
272 "CompilerPass with name '%s' did not.")
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/typed_passes.py:105, in BaseTypeInference.run_pass(self, state)
99 """
100 Type inference and legalization
101 """
102 with fallback_context(state, 'Function "%s" failed type inference'
103 % (state.func_id.func_name,)):
104 # Type inference
--> 105 typemap, return_type, calltypes, errs = type_inference_stage(
106 state.typingctx,
107 state.targetctx,
108 state.func_ir,
109 state.args,
110 state.return_type,
111 state.locals,
112 raise_errors=self._raise_errors)
113 state.typemap = typemap
114 # save errors in case of partial typing
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/typed_passes.py:83, in type_inference_stage(typingctx, targetctx, interp, args, return_type, locals, raise_errors)
81 infer.build_constraint()
82 # return errors in case of partial typing
---> 83 errs = infer.propagate(raise_errors=raise_errors)
84 typemap, restype, calltypes = infer.unify(raise_errors=raise_errors)
86 # Output all Numba warnings
File ~/anaconda3/envs/rapids-22.02/lib/python3.9/site-packages/numba/core/typeinfer.py:1086, in TypeInferer.propagate(self, raise_errors)
1083 force_lit_args = [e for e in errors
1084 if isinstance(e, ForceLiteralArg)]
1085 if not force_lit_args:
-> 1086 raise errors[0]
1087 else:
1088 raise reduce(operator.or_, force_lit_args)
TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
Unknown attribute 'shift' of type datetime64[ns]
File "../../../../../tmp/ipykernel_2389/2211239783.py", line 12:
<source missing, REPL/exec in use?>
During: typing of get attribute at /tmp/ipykernel_2389/2211239783.py (12)
File "../../../../../tmp/ipykernel_2389/2211239783.py", line 12:
<source missing, REPL/exec in use?>
During: resolving callee type: type(<numba.cuda.compiler.Dispatcher object at 0x7f6e11b652c0>)
During: typing of call at <string> (37)
File "<string>", line 37:
<source missing, REPL/exec in use?>
Anyone knows how I can perform a similar function to shift(-1) in this function? To compare if current Date is different from previous, to reset Counters and Values for each new day.
If someone thinks that have better solution for function would be great to hear.
Thank you
Trying with [i-1]:
if date != date[i-1]:
TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
No implementation of function Function(<built-in function getitem>) found for signature:
>>> getitem(datetime64[ns], int64)
There are 22 candidate implementations:
- Of which 22 did not match due to:
Overload of function 'getitem': File: <numerous>: Line N/A.
With argument(s): '(datetime64[ns], int64)':
No match.
During: typing of intrinsic-call at /tmp/ipykernel_2311/51191800.py (12)
File "../../../../../tmp/ipykernel_2311/51191800.py", line 12:
<source missing, REPL/exec in use?>
During: resolving callee type: type(<numba.cuda.compiler.Dispatcher object at 0x7f5edc1f5b80>)
During: typing of call at <string> (37)
File "<string>", line 37:
<source missing, REPL/exec in use?>
Converted df date to int with:
df['Date'] = df['DateTime'].dt.strftime('%Y%m%d')
df['Date'] = cudf.to_numeric(df['Date'])
gives the error :
TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
Unknown attribute 'append' of type list(undefined)<iv=None>
File "../../../../../tmp/ipykernel_2338/3901380751.py", line 13:
<source missing, REPL/exec in use?>
During: typing of get attribute at /tmp/ipykernel_2338/3901380751.py (13)
File "../../../../../tmp/ipykernel_2338/3901380751.py", line 13:
<source missing, REPL/exec in use?>
During: resolving callee type: type(<numba.cuda.compiler.Dispatcher object at 0x7f861aaeda40>)
During: typing of call at <string> (37)
File "<string>", line 37:
<source missing, REPL/exec in use?>
Does it mean I need to do some change in the list creation to append?
Any help about the function would be highly valuable
nopython
mode, it says "the reflection process can be expensive for large lists and it is not supported for lists that contain reflected data types. Users cannot use list-of-list as an argument because of this limitation." https://numba.pydata.org/numba-doc/dev/reference/pysupported.html#list-reflection