Search code examples
pythonpython-3.xeventsevent-handlinglistener

How do I create an event bus using python?


So, I have a script I made that tracks twitter follows. The bulk of the script's work is done in two parts. One function track_user(old_followers: dict, screen_name: str) -> dict, where dict contains information about new followers and unfollowers. The second function takes the dict from that first function, parses it, and converts it to text that makes sense.

My predicament is: I want other users of this script to be able to insert their own separate from my own code.

I'll give an example below

# My code

def track_user(old_followers: dict, screen_name: str) -> dict:
    final_dict = tracking_code_here()
    SomeEventBus.emit(TrackUserEvent(final_dict))
    return final_dict

# Some user's code (using my example above as an example)
def convert_tracking_to_text(event: TrackUserEvent): 
    perform_some_text_conversion(event.final_dict)
    return

# Maybe a decorator can be used in this case instead
MyScript.EventBus.AddListener(TrackUserEvent, convert_tracking_to_text)  

Hopefully, this makes sense. I just want my script to emit events that others can import and listen to.


Solution

  • I made some time ago a library for handling async events. Basically you could do things such as

    # creates a main event manager
    manager = EventManager()
    
    # creates an event
    track_user_event = manager.create_event("track_user_event")
    
    async def track_user(old_followers: dict, screen_name: str) -> dict:
        final_dict = tracking_code_here()
        
        # emit event here
        await track_user_event(TrackUserEvent(finalDict)) # could also be await manager.raise_event("track_user_event");
    
        return final_dict
    
    @track_user_event.as_callback()
    async def convert_tracking_to_text(event: TrackUserEvent): 
        perform_some_text_conversion(event.final_dict)
        return
    

    It's far away to be perfect but I hope it could help you.