Search code examples
pythonzeromqdistributed-computingpython-asyncioreq

pyzmq REQ/REP with asyncio await for variable


I'm playing for the first time with asyncio in python and trying to combine it with ZMQ.

Basically my issue is that I have a REP/REQ system, in an async def with a function I need to await. how the value is not updated. Here's a snippet of the code to illustrate that:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")

I send this object to a class and get it back in this function

async def readsonar(self, trigger_pin, REP_server_django):
        i= 0
        while True:

            ping_from_view = await REP_server_django.recv()  # line.1
            value = await self.board.sonar_read(trigger_pin) # line.2
            print(value)                                     # line.3
            json_data = json.dumps(value)                    # line.4
            #json_data = json.dumps(i)                       # line.4bis
            REP_server_django.send(json_data.encode())       # line.5
            i+=1                                             # line.6
            await asyncio.sleep(1/1000)                      # line.7

the sonar_read, is using pymata_express to read an ultrasonic sensor. If I comment line.2 and line.4 I get the right value for i. If I comment line.1 and line.5 the print(value) prints the correct value from sonar_read. However, when I run it as shown here, the value is not updated.

Am I missing something?


EDIT :
Edited a type regarding the line comments. What I meant is that if I only read the sonar and print the value. It works fine. If I only .recv() and .send(json.dumps(i).encode()), it works. But if I try to send the value from the sonar. It locks to a given value which is not updated


EDIT2 : (answer to Alan Yorinks): here is the MWE, it considers what you sent regarding the declaration of zmq in the class. It is taken from the pymata_express example concurrent_tasks.py

To reproduce the error, run these two scripts in two different terminals. You will need an arduino board with Frimata_express installed. If all runs well, PART A. should only spit out the same value on the mve_req.py end. You may edit the diffrent blocks (PARTS A, B or C) to see the behaviour.

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    async def readsonar(self):
        i = 0
        while True:


            #PART. A. WHAT I HOPE COULD WORK
            rep_recv = await self.rep.recv()                       # line.1
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            # json_data = json.dumps(i)                            # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7


            '''
            #PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''

            '''
            #PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
            rep_recv = await self.rep.recv()                       # line.1
            json_data = json.dumps(i)                              # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''



    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()

mve_req.py

import zmq
import time
import json

def start_zmq():
    context = zmq.Context()
    REQ_django  = context.socket(zmq.REQ)
    REQ_django.connect("tcp://localhost:5558")

    return REQ_django, context

def get_sonar(REQ_django):
    REQ_django.send(b"server_django")
    ping_from_server_django = REQ_django.recv()
    return ping_from_server_django.decode()

if __name__ == '__main__':

    data = {"sensors":{}}

    REQ_django, context = start_zmq()
    while REQ_django:

            data['sensors']['sonar'] = get_sonar(REQ_django)
            json_data = json.dumps(data)
            print(data)

            #DO OTHER WORK
            time.sleep(1)

    REQ_django.close()
    context.term()

Solution

  • In full disclosure, I am the author of pymata-express and python-banyan. The OP requested that I post this solution, so this is not meant to be a shameless plug.

    I have been developing with asyncio since it was first introduced in Python 3. When asyncio code works, asyncio (IMHO) can simplify concurrency and the code. However, when things go awry, it can be frustrating to debug and understand the cause of the issues.

    I apologize ahead of time, since this may be a little lengthy, but I need to provide some background information so that the example will not seem like some random bit of code.

    The python-banyan framework was developed to provide an alternative to threading, multi-processing, and asyncio. Simply put, a Banyan application consists of small targeted executables that communicate with one another using protocol messages that are shared over a LAN. At its core it uses Zeromq. It was not designed to have traffic move over the WAN, but to use a LAN as a "software backplane." In some ways, Banyan is similar to MQTT, but it is much faster when used within a LAN. It does have the capability to connect to an MQTT network if that is desireable.

    Part of Banyan is a concept called OneGPIO. It is a protocol messaging specification that abstracts GPIO functionality to be independent of any hardware implementation. To implement the hardware specifics, specialized Banyan components, called Banyan Hardware Gateways were developed. There are gateways available for the Raspberry Pi, Arduino, ESP-8266 and Adafruit Crickit Hat. A GPIO application publishes the generic OneGPIO messages that any or all of the gateways can elect to receive. To move from one hardware platform to another, the hardware associated gateway is launched, and without modification, the control component (which is the code shown below) is launched. To go from one hardware platform to another, there are no code modifications necessary for any of the components, neither the control component nor the gateway is modified. Variables, such as pin numbers may be specificied through command line options when launching the control component. For the Arduino Gateway, pymata-express is used to control the GPIO of the Arduino. Pymata-express is an asyncio implementation of a StandardFirmata client. The thing to note that the code below is not asyncio. The Banyan framework allows one to develop using the tools that fit the problem, yet allow decoupling of parts of the solution, and in this case, the application allows the mixing the of asyncio with non-asyncio without any of the headaches normally encountered in doing so.

    In the code provided, all the code below the class definition is used to provide support for command-line configuration options.

    import argparse
    import signal
    import sys
    import threading
    import time
    
    from python_banyan.banyan_base import BanyanBase
    
    
    class HCSR04(BanyanBase, threading.Thread):
        def __init__(self, **kwargs):
            """
            kwargs contains the following parameters
            :param back_plane_ip_address: If none, the local IP address is used
            :param process_name: HCSR04
            :param publisher_port: publishing port
            :param subscriber_port: subscriber port
            :param loop_time: receive loop idle time
            :param trigger_pin: GPIO trigger pin number
            :param echo_pin: GPIO echo pin number
            """
    
            self.back_plane_ip_address = kwargs['back_plane_ip_address'],
            self.process_name = kwargs['process_name']
            self.publisher_port = kwargs['publisher_port']
            self.subscriber_port = kwargs['subscriber_port'],
            self.loop_time = kwargs['loop_time']
            self.trigger_pin = kwargs['trigger_pin']
            self.echo_pin = kwargs['echo_pin']
            self.poll_interval = kwargs['poll_interval']
    
            self.last_distance_value = 0
    
            # initialize the base class
            super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
                                         subscriber_port=kwargs['subscriber_port'],
                                         publisher_port=kwargs['publisher_port'],
                                         process_name=kwargs['process_name'],
                                         loop_time=kwargs['loop_time'])
    
            threading.Thread.__init__(self)
            self.daemon = True
    
            self.lock = threading.Lock()
    
            # subscribe to receive messages from arduino gateway
            self.set_subscriber_topic('from_arduino_gateway')
    
            # enable hc-sr04 in arduino gateway
            payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
                       'echo_pin': self.echo_pin}
            self.publish_payload(payload, 'to_arduino_gateway')
    
            # start the thread
            self.start()
    
            try:
                self.receive_loop()
            except KeyboardInterrupt:
                self.clean_up()
                sys.exit(0)
    
        def incoming_message_processing(self, topic, payload):
            print(topic, payload)
            with self.lock:
                self.last_distance_value = payload['value']
    
        def run(self):
            while True:
                with self.lock:
                    distance = self.last_distance_value
                payload = {'distance': distance}
                topic = 'distance_poll'
                self.publish_payload(payload, topic)
                time.sleep(self.poll_interval)
    
    
    def hcsr04():
        parser = argparse.ArgumentParser()
        # allow user to bypass the IP address auto-discovery.
        # This is necessary if the component resides on a computer
        # other than the computing running the backplane.
        parser.add_argument("-b", dest="back_plane_ip_address", default="None",
                            help="None or IP address used by Back Plane")
        parser.add_argument("-i", dest="poll_interval", default=1.0,
                            help="Distance polling interval")
        parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
                            help="Set process name in banner")
        parser.add_argument("-p", dest="publisher_port", default="43124",
                            help="Publisher IP port")
        parser.add_argument("-s", dest="subscriber_port", default="43125",
                            help="Subscriber IP port")
        parser.add_argument("-t", dest="loop_time", default=".1",
                            help="Event Loop Timer in seconds")
        parser.add_argument("-x", dest="trigger_pin", default="12",
                            help="Trigger GPIO pin number")
        parser.add_argument("-y", dest="echo_pin", default="13",
                            help="Echo GPIO pin number")
    
        args = parser.parse_args()
    
        if args.back_plane_ip_address == 'None':
            args.back_plane_ip_address = None
        kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
                      'publisher_port': args.publisher_port,
                      'subscriber_port': args.subscriber_port,
                      'process_name': args.process_name,
                      'loop_time': float(args.loop_time),
                      'trigger_pin': int(args.trigger_pin),
                      'echo_pin': int(args.echo_pin),
                      'poll_interval': int(args.poll_interval)
                      }
    
        # replace with the name of your class
        HCSR04(**kw_options)
    
    
    # signal handler function called when Control-C occurs
    def signal_handler(sig, frame):
        print('Exiting Through Signal Handler')
        raise KeyboardInterrupt
    
    
    # listen for SIGINT
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    if __name__ == '__main__':
        hcsr04()