Search code examples
python-3.xapache-kafkafaust

How to share faust table between multiple agents or faust timers?


I'm trying to publish faust table's data(count) to a kafka topic after some time interval. timer is working when I publish some simple string, but it is not able to access table's data somehow. Following is the code for timer:

@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
            await anomaly_topic.send(
            value=str(page_views['total'].value())
          )
@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        total=0
        page_views[view.id]+=1
        for everykey in list(page_views.keys()):
            if everykey != 'total':
                total+=page_views[everykey].value()
        page_views['total'] = total

The agent is working fine. I am able to see the values correctly.


Solution

  • After lot of experiments, it turned out that you can't access table's values along with the app timer(even if you specify relative_field option while creating table). A workaround for this problem is to create another table to maintain timestamps of your messages and use them in your business logic.

       if view.timestamp-page_views_timer[view.id+'_first_timestamp'] > 60:
             await anomaly_topic.send(value={//the data to be sent})
    

    where page_views_timer is the new table created.