Search code examples
pythonopencvffmpegh.264pyav

How to use pyav or opencv to decode a live stream of raw H.264 data?


The data was received by socket ,with no more shell , they are pure I P B frames begin with NAL Header(something like 00 00 00 01). I am now using pyav to decode the frames ,but i can only decode the data after the second pps info(in key frame) was received(so the chunk of data I send to my decode thread can begin with pps and sps ), otherwise the decode() or demux() will return error "non-existing PPS 0 referenced decode_slice_header error" .

I want to feed data to a sustaining decoder which can remember the previous P frame , so after feeding one B frame, the decoder return a decoded video frame. Or someform of IO that can be opened as container and keep writing data into it by another thread.

Here is my key code:

#read thread... read until get a key frame, then make a new io.BytesIO() to store the new data.
rawFrames = io.BytesIO()
while flag_get_keyFrame:()
    ....
    content= socket.recv(2048)
    rawFrames.write(content)
    ....

#decode thread... decode content between two key frames
....
rawFrames.seek(0)
container = av.open(rawFrames)
for packet in container.demux():
    for frame in packet.decode():
        self.frames.append(frame)
....

My code will play the video but with a 3~4 seconds delay. So I am not putting all of it here, because I know it's not actually working for what I want to achieve. I want to play the video after receiving the first key frame and decode the following frames right after receiving them . Pyav opencv ffmpeg or something else ,how can I achieve my goal?


Solution

  • After hours of finding an answer for this as well. I figure this out myself.

    For single thread, you can do the following:

    rawData = io.BytesIO()
    container = av.open(rawData, format="h264", mode='r')
    cur_pos = 0
    while True:
        data = await websocket.recv()
        rawData.write(data)
        rawData.seek(cur_pos)
        for packet in container.demux():
            if packet.size == 0:
                continue
            cur_pos += packet.size
            for frame in packet.decode():
                self.frames.append(frame)
    

    That is the basic idea. I have worked out a generic version that has receiving thread and decoding thread separated. The code will also skip frames if the CPU does not keep up with the decoding speed and will start decoding from the next key frame (so you will not have the teared green screen effect). Here is the full version of the code:

    import asyncio
    import av
    import cv2
    import io
    from multiprocessing import Process, Queue, Event
    import time
    import websockets
    
    def display_frame(frame, start_time, pts_offset, frame_rate):
        if frame.pts is not None:
            play_time = (frame.pts - pts_offset) * frame.time_base.numerator / frame.time_base.denominator
            if start_time is not None:
                current_time = time.time() - start_time
                time_diff = play_time - current_time
                if time_diff > 1 / frame_rate:
                    return False
                if time_diff > 0:
                    time.sleep(time_diff)
        img = frame.to_ndarray(format='bgr24')
        cv2.imshow('Video', img)
        return True
    
    def get_pts(frame):
        return frame.pts
    
    def render(terminated, data_queue):
        rawData = io.BytesIO()
        cur_pos = 0
        frames_buffer = []
        start_time = None
        pts_offset = None
        got_key_frame = False
        while not terminated.is_set():
            try:
                data = data_queue.get_nowait()
            except:
                time.sleep(0.01)
                continue
            rawData.write(data)
            rawData.seek(cur_pos)
            if cur_pos == 0:
                container = av.open(rawData, mode='r')
                original_codec_ctx = container.streams.video[0].codec_context
                codec = av.codec.CodecContext.create(original_codec_ctx.name, 'r')
            cur_pos += len(data)
            dts = None
            for packet in container.demux():
                if packet.size == 0:
                    continue
                dts = packet.dts
                if pts_offset is None:
                    pts_offset = packet.pts
                if not got_key_frame and packet.is_keyframe:
                    got_key_frame = True
                if data_queue.qsize() > 8 and not packet.is_keyframe:
                    got_key_frame = False
                    continue
                if not got_key_frame:
                    continue
                frames = codec.decode(packet)
                if start_time is None:
                    start_time = time.time()
                frames_buffer += frames
                frames_buffer.sort(key=get_pts)
                for frame in frames_buffer:
                    if display_frame(frame, start_time, pts_offset, codec.framerate):
                        frames_buffer.remove(frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
            if dts is not None:
                container.seek(25000)
            rawData.seek(cur_pos)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        terminated.set()
        cv2.destroyAllWindows()
    
    async def receive_encoded_video(websocket, path):
        data_queue = Queue()
        terminated = Event()
        p = Process(
            target=render,
            args=(terminated, data_queue)
        )
        p.start()
        while not terminated.is_set():
            try:
                data = await websocket.recv()
            except:
                break
            data_queue.put(data)
        terminated.set()