I am trying to adapt this answer to my needs. I am trying to write a small program to operate some lab equipment, and instead of a prerecorded video, I want to show the output of a camera. That part works well using the following code:
import numpy as np
import pandas as pd
import holoviews as hv
hv.extension('bokeh')
from holoviews.streams import Pipe, Buffer
from tornado.ioloop import IOLoop
from tornado import gen
import cv2
from instrumental.drivers.cameras import uc480
instruments = uc480.list_instruments()
@gen.coroutine
def f():
#async def f():
while cam.is_open:
frame = cam.grab_image(timeout='10s', copy=True, exposure_time='10ms')
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
yield pipe.send(rgb)
#await pipe.send(rgb)
cv2.destroyAllWindows()
cam = uc480.UC480_Camera(instruments[0],reopen_policy='reuse')
cam.start_live_video(framerate = "10Hz")
frame0 = cam.grab_image(timeout='10s', copy=True, exposure_time='10ms')
rgb = cv2.cvtColor(frame0, cv2.COLOR_BGR2RGBA)
pipe = Pipe(data=rgb)
#camera_task = asyncio.gather(f())#doesn't work?
camera_loop = IOLoop.current().add_callback(f)
hv.DynamicMap(hv.RGB, streams=[pipe])
I have little experience outside of writing small scripts, so I chose to use Panel for my simple UI, and asyncio
to make everything running smoothly.
I started to modify the code some more in order to better understand it, yet so far I failed. My questions are the following:
asyncio
provides similar/identical funcionality. I would very much like to use only asyncio
if possible, or does tornado add anything substantial in this case?@gen.coroutine
decorator and the yield
keyword with async
and await
, which I already know from asyncio
, but when doing that, the loop never starts. How do I start the loop then in the correct way?asyncio
I would just .cancel()
the task, but in this case that didn't work.edit: a little more information:
panel serve
(which shows everything in a browser tab, running a tornado server in the background if I understand correctly)If all you want to do is view the camera, I think you might be better off ditching tornado and Panel and simply running a small web server (e.g. Flask) and streaming the video in a web page. It doesn't sound like you actually want to use any of the features of Panel or Holoview.
However, since you asked, this is how you could use asyncio in place of tornado to show an mp4 video
import asyncio
import cv2
import holoviews as hv
from holoviews.streams import Pipe
hv.extension("bokeh")
# Use a video file for testing
from pathlib import Path
video_path = Path("path/to/a/file.mp3")
async def f():
vd = cv2.VideoCapture(str(video_path))
frames = int(vd.get(cv2.CAP_PROP_FRAME_COUNT))
while frames > 0:
ret, frame = vd.read()
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pipe.send(rgb)
frames -= 1
await asyncio.sleep(1)
vd.release()
cv2.destroyAllWindows()
pipe = Pipe(data=[])
so I imagine that you could do very similar for your webcam.