Search code examples
pythonmultithreadingsemaphoreros

How to implement synchronization in multi threading with python and ROS Services?


Im trying to implement multi threading (parallel processing) with python and using mutex threading. I have first process that check the Pressure Value and the modem update(in the code implemented with odom_callback and callback_modem functions), and second process that calls ROS SERVICES ( in the code implemented with ros_serice_server server and imu_client client functions). Here is the implementation code in python

#!/usr/bin/env python3

from __future__ import print_function
import rospy
import numpy as np
from os import system
import time
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Motors_Functions as Motor
import Pressure_Functions as Pressure
from geometry_msgs.msg import Vector3
import Modem_Functions as Modem
import threading 
import time
import serial
import serial.tools.list_ports
from time import sleep
from std_msgs.msg import Float32
from std_msgs.msg import String
from demo_teleop.srv import ImuValue,ImuValueResponse

P0 = 1.01325 #Default Pressure 
mutex = threading.Lock()
Communication_Mode_ = 0
pub_pressure = rospy.Publisher('depth',Vector3,queue_size=1)
pub_modem = rospy.Publisher('modem_data',Float32,queue_size=1)

def handle_ros_services(req):
    mutex.acquire(blocking=True)
    print("Server Read Data:")
    global T0
    data_received = Pressure.Pressure_Get_Final_Values(1,1)
    #print("Server Read Data:")
    T0 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    T=T0
    temperature = T
    current_x_orientation_s = temperature
    print("Returning Service Temperature Data", current_x_orientation_s)
    return ImuValueResponse(current_x_orientation_s, True)
    mutex.release()

def ros_serice_server():
    s = rospy.Service('imu_value', ImuValue, handle_ros_services)
    print("Ready to get_value")

def odom_callback():
    # reentrang processing
    mutex.acquire(blocking=True)
    # work serial port here, e.g. send msg to serial port
    global P0
    data_received = Pressure.Pressure_Get_Final_Values(1,1)
    #P1 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
    P1 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    #P0 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
    P0 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    P = P0 # Relative Measured Pressure
    feedback =Vector3()
    feedback.x = 0    #Angular Velocity
    feedback.y = 0
    feedback.z = P/9.81 #Depth
    pressure = feedback.z
    print("Pressure : ", pressure)
    pub_pressure.publish(feedback)
    # reentrant processing
    mutex.release()
    
def callback_modem(event):
    # reentrant processing
    mutex.acquire(blocking=True)
    # work serial port here, e.g. check for incoming data
    event = Serial.Serial_Port_Receive_Data(20,0.2)
    if (event == 1): # Received data from acoustic modem
        modem_data= event
        pub_modem.publish(modem_data)
        print("received ")
    else:
        print("not received...... ")
    mutex.release()
 
if __name__ == '__main__':
    # initialize serial port here
    Serial.Serial_Port_Standard()
    rospy.init_node('imu_value')
    ros_serice_server()
    rospy.Timer(rospy.Duration(1), callback_modem) 
    while not rospy.is_shutdown():
        try:
            odom_callback()
        except:
            print('pass')

And the client node

#!/usr/bin/env python3

from __future__ import print_function
import rospy
import sys
import numpy as np
from os import system
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Motors_Functions as Motor
import Pressure_Functions as Pressure
from geometry_msgs.msg import Vector3
import Modem_Functions as Modem
import time
import serial
import serial.tools.list_ports

from time import sleep
from std_msgs.msg import Float32
from std_msgs.msg import String
from demo_teleop.srv import *

mutex = threading.Lock()
Communication_Mode_ = 0
pub_modem = rospy.Publisher('modem_data',Float32,queue_size=1)

def imu_client():
    mutex.acquire(blocking=True)
    rospy.wait_for_service('imu_value')
    imu_value = rospy.ServiceProxy('imu_value', ImuValue)
    print("Request call send")
    resp1 = imu_value(0.05)
    return resp1.current_x_orientation_s
    mutex.release()

if __name__ == "__main__":
    rospy.init_node('client_node_f')
    while not rospy.is_shutdown():
        try:
            print("entering client")
            value = imu_client()
            print(value)
            time.sleep(1)
        except:
            print('pass')

So the output is following. The output of the first process with the ROS Services Server is

Pressure :  0.10602446483180428
Server Read Data:
Returning Service Temperature Data 1.0401

And then after calling the client I got

entering client
Request call send
1.0401
entering client

The problem is that after calling the ROS SERVICE client node the process stop so doesn't continue with the first process (Pressure value and modem update) . The ROS SERVICES process should be call only on demand and should HALT the first process (Pressure and modem) and then is should resume with the work. So, do I need to implement SEMAPHORES for the ROS SERVICES call ? If yes how it should be in the code. So I do need kind of synchronization , right?Please any help?


Solution

  • Your problem is:

    def handle_ros_services(req):
        mutex.acquire(blocking=True)
        ...
        return ImuValueResponse(current_x_orientation_s, True)
        mutex.release()
    

    Because of the return statement, the release is never executed.

    You need at the end:

        value = ImValueResponse(...)
        mutex.release()
        return value
    

    Even better would be to use your mutex as part of a with statement:

    with mutex:
        do anything you want, knowing that the lock will be released
        at the end, even if you return or throw an exception.