Search code examples
python-3.xcan-bus

How to install a rx callback with python-can?


I'd like to have a function to receive incoming CANbus frames asynchronously. I wrote the following, but the following issue:

    can.rxcallback(0, callback)
AttributeError: module 'can' has no attribute 'rxcallback'

My code is as follow:

def reception():
    global count
    data = [0, 0, 0, memoryview(bytearray(8))]

    def callback(bus, reason):
        global count
        count += 1
        bus.recv(0, list=data)
        if reason == 0:
            pass
        elif reason == 1:
            pass  # fifo full
        elif reason == 2:
            # fifo overflow
            raise Exception("fifo overflow")

    can.rxcallback(0, callback)

Solution

  • Taken straight form python-can's documentation:

    def print_message(msg: can.Message) -> None:
        """Regular callback function. Can also be a coroutine."""
        print(msg)
    
    
    notifier = can.Notifier(bus, [print_message])
    ...
    notifier.stop()
    

    You create an instance of Notifier and register a list of listeners.

    A listener is either a function taking a parameter of type can.Message or an instance of Listener.

    After you have received all messages, call stop() on the notifier.


    There is no method like rxcallback or similar on python-can.