Search code examples
pythonpython-3.xopencvcomputer-visionros

How to use OpenCV video capture with custom video input?


I am trying to use the LK Optical Flow from this tutorial, to get some motion estimation into a robot simulation made with ROS+Gazebo.

I could manage to make properly the bridge between ROS and OpenCV via cv_bridge, as per the code below, and could implement some sample features which work "frame-by-frame" without major issues.

However, the optical flow tutorial reference seems to accept only video inputs, such as webcam and/or saved video files, and this is where I got stuck.

How could I apply the LK Optical flow in a "frame-by-frame" approach, or configure my cv_bridge to act as a "custom" camera device?

This is my cv_brige so far:

#!/usr/bin/env python

import roslib
import rospy
import sys
import cv2
import numpy as np

from sensor_msgs.msg import Image, CameraInfo
from cv_bridge import CvBridge, CvBridgeError


class OpticalFlow(object):



    def __init__(self):

        #Setting up the bridge and the subscriber
        self.bridge = CvBridge()
        self.image_sub = rospy.Subscriber("/realsense/color/image_raw", Image,self.image_callback)
        
        # Parameters for Good Features Detection
        self.feature_params = dict( maxCorners = 100,
                                qualityLevel = 0.3,
                                minDistance = 7,
                                blockSize = 7 )

        # Parameters for Lucas-Kanade optical flow
        self.lk_params = dict( winSize  = (150,150),
                                maxLevel = 2,
                                criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

        # Create some random colors for the tracking points
        self.color = np.random.randint(0,255,(100,3))



    def image_callback(self,ros_image):

        # Using cv_bridge() to convert the ROS image to OpenCV format
        try:
            cv_image = self.bridge.imgmsg_to_cv2(ros_image, "bgr8")#bgr8
            hsv_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2HSV)
            gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
            
        except CvBridgeError as e:
            print(e)
    

        cv2.imshow("Robot Image", cv_image) # The actual (non-processed image from the simulation/robot)
        #cv2.imshow('Image HSV', hsv_image)
        #cv2.imshow('Image Gray', gray)
        frame = np.array(cv_image, dtype=np.uint8)


        # Calling the Optical Flow Function
        display = self.optical_flow(frame,self.feature_params,self.lk_params,self.color)


    
    def optical_flow(self,frame,feature_params,lk_params,color):
        
        
        old_frame = frame
        old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
        p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)
        mask = np.zeros_like(old_frame)
        while(1):
            
            newframe = frame
            frame_gray = cv2.cvtColor(newframe, cv2.COLOR_BGR2GRAY)

            # Calculating the optical flow
            p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)
            
            # Select good points
            good_new = p1[st==1]
            good_old = p0[st==1]

            # Draw the tracks
            for i,(new,old) in enumerate(zip(good_new, good_old)):
                a,b = new.ravel()
                c,d = old.ravel()
                mask = cv2.line(mask, (a,b),(c,d), color[i].tolist(), 2)
                frame = cv2.circle(frame,(a,b),5,color[i].tolist(),-1)
            img = cv2.add(frame,mask)
            
            cv2.imshow('LK_Flow',img)                    
            k = cv2.waitKey(30) & 0xff
            if k == 27:
                break

            # Now update the previous frame and previous points
            old_gray = frame_gray.copy()
            p0 = good_new.reshape(-1,1,2)
            


def main():

    optical_flow_object = OpticalFlow()
    rospy.init_node('KLT_Node', anonymous=True)
    rospy.loginfo("\nWaiting for image topics...\n...")
    try:
        rospy.spin()
    except KeyboardInterrupt:
        rospy.loginfo("\nShutting Down...\n...")
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

I also would like to know, if possible, where I can find more information about how to manipulate those parameters (from the tutorial):

# params for ShiTomasi corner detection
feature_params = dict( maxCorners = 100,
                       qualityLevel = 0.3,
                       minDistance = 7,
                       blockSize = 7 )
# Parameters for lucas kanade optical flow
lk_params = dict( winSize  = (15,15),
                  maxLevel = 2,
                  criteria = (cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03))

Solution

  • I would recommend using cv2.calcOpticalFlowPyrLK(old_frame, cur_frame, ...) or cv2.calcOpticalFlowFarneback(old_frame, cur_frame, ...) (dense optical flow). There is a bunch of information about these methods on the cv2 website. From personal experience, these methods work great!

    Let me know if you have any questions or problems!