Search code examples
javastreamingred5

Could not render red5 recorded media stream


I have problem of playing back the recorded media file from red5 published stream, following is my code. I could see a file called out.flv is created, but this out.flv can not be played back.

public class Red5ClientTest {

    private static Timer timer;

    private static RTMPClient client;

    private static String sourceStreamName;

    private static int videoTs;

    private static int audioTs;

    private static FLVWriter writer;

    private static int bytesRead =0;

    public static void main(String[] args) throws IOException {
        String sourceHost = "localhost";
        int sourcePort = 1935;
        String sourceApp = "oflaDemo";
        sourceStreamName = "myStream";
        timer = new Timer();        

        client = new RTMPClient();

        String path = "c:\\temp\\out.flv";
        File file = new File(path);
        if (!file.exists()) {
            file.createNewFile();
        }
        writer = new FLVWriter(file,true);

        client.setStreamEventDispatcher(new StreamEventDispatcher());
        client.setStreamEventHandler(new INetStreamEventHandler() {
            public void onStreamEvent(Notify notify) {
                System.out.printf("onStreamEvent: %s\n", notify);
                ObjectMap<?, ?> map = (ObjectMap<?, ?>) notify.getCall().getArguments()[0];
                String code = (String) map.get("code");
                System.out.printf("<:%s\n", code);
                if (StatusCodes.NS_PLAY_STREAMNOTFOUND.equals(code)) {
                    System.out.println("Requested stream was not found");
                    client.disconnect();

                }
                else if (StatusCodes.NS_PLAY_UNPUBLISHNOTIFY.equals(code)
                        || StatusCodes.NS_PLAY_COMPLETE.equals(code)) {
                    System.out.println("Source has stopped publishing or play is complete");
                    client.disconnect();
                }
            }
        });
        client.setConnectionClosedHandler(new Runnable() {
            public void run() {
                if (writer != null) {
                    writer.close();

                }
                System.out.println("Source connection has been closed, proxy will be stopped");
                System.exit(0);
            }
        });
        client.setExceptionHandler(new ClientExceptionHandler() {
            @Override
            public void handleException(Throwable throwable) {
                throwable.printStackTrace();
                System.exit(1);
            }
        });

        // connect the consumer
        Map<String, Object> defParams = client.makeDefaultConnectionParams(sourceHost, sourcePort,
                sourceApp);
        // add pageurl and swfurl
        defParams.put("pageUrl", "");
        defParams.put("swfUrl", "app:/Red5-StreamRelay.swf");
        // indicate for the handshake to generate swf verification data
        client.setSwfVerification(true);
        // connect the client
        client.connect(sourceHost, sourcePort, defParams, new IPendingServiceCallback() {
            public void resultReceived(IPendingServiceCall call) {
                System.out.println("connectCallback");
                ObjectMap<?, ?> map = (ObjectMap<?, ?>) call.getResult();
                String code = (String) map.get("code");
                if ("NetConnection.Connect.Rejected".equals(code)) {
                    System.out.printf("Rejected: %s\n", map.get("description"));
                    client.disconnect();
                    //proxy.stop();
                }
                else if ("NetConnection.Connect.Success".equals(code)) {
                    // 1. Wait for onBWDone
                    timer.schedule(new BandwidthStatusTask(), 2000L);
                    Object result = call.getResult();
                    System.out.println("Red5ClientTest.main()");
                }
                else {
                    System.out.printf("Unhandled response code: %s\n", code);
                }
            }
        });

        // keep sleeping main thread while the proxy runs

        // kill the timer
        //timer.cancel();
        System.out.println("Stream relay exit");

    }

    /**
     * Handles result from subscribe call.
     */
    private static final class SubscribeStreamCallBack implements IPendingServiceCallback {

        public void resultReceived(IPendingServiceCall call) {
            System.out.println("resultReceived: " + call);
            Object result = call.getResult();
            System.out.println("results came {}" + result);
        }

    }

    private static final class StreamEventDispatcher implements IEventDispatcher {

        public void dispatchEvent(IEvent event) {
            System.out.println("ClientStream.dispachEvent()" + event.toString());
            try {

                //RTMPMessage build = RTMPMessage.build((IRTMPEvent) event);




                IRTMPEvent rtmpEvent = (IRTMPEvent) event;
                ITag tag = new Tag();
                tag.setDataType(rtmpEvent.getDataType());
                if (rtmpEvent instanceof VideoData) {
                    videoTs += rtmpEvent.getTimestamp();
                    tag.setTimestamp(videoTs);
                }
                else if (rtmpEvent instanceof AudioData) {
                    audioTs += rtmpEvent.getTimestamp();
                    tag.setTimestamp(audioTs);
                }


                IoBuffer data = ((IStreamData) rtmpEvent).getData().asReadOnlyBuffer();
                tag.setBodySize(data.limit());
                tag.setBody(data);
                try {
                    writer.writeTag(tag);

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                System.out.println("writting....");    


            }
            catch (Exception e) {//IOException
                e.printStackTrace();
            }
        }

    }

    private static final class BandwidthStatusTask extends TimerTask {

        @Override
        public void run() {
            // check for onBWDone
            System.out.println("Bandwidth check done: " + client.isBandwidthCheckDone());
            // cancel this task
            this.cancel();
            // create a task to wait for subscribed
            timer.schedule(new PlayStatusTask(), 1000L);
            // 2. send FCSubscribe
            client.subscribe(new SubscribeStreamCallBack(), new Object[] { sourceStreamName });
        }

    }

    private static final class PlayStatusTask extends TimerTask {

        @Override
        public void run() {
            // checking subscribed
            System.out.println("Subscribed: " + client.isSubscribed());
            // cancel this task
            this.cancel();
            // 3. create stream
            client.createStream(new CreateStreamCallback());
        }

    }

    /**
     * Creates a "stream" via playback, this is the source stream.
     */
    private static final class CreateStreamCallback implements IPendingServiceCallback {

        public void resultReceived(IPendingServiceCall call) {
            System.out.println("resultReceived: " + call);
            int streamId = ((Number) call.getResult()).intValue();
            System.out.println("stream id: " + streamId);
            // send our buffer size request
            if (sourceStreamName.endsWith(".flv") || sourceStreamName.endsWith(".f4v")
                    || sourceStreamName.endsWith(".mp4")) {
                client.play(streamId, sourceStreamName, 0, -1);
            }
            else {
                client.play(streamId, sourceStreamName, -1, 0);
            }
        }

    }

}

what could I be doing possibly wrong here?


Solution

  • Finally got it

    public class TeqniRTMPClient {
    
        private static final Logger logger = LoggerFactory.getLogger(MyRtmpClient.class);
    
        public static void main(String args[]) throws IOException {
            TeqniRTMPClient client = new TeqniRTMPClient("localhost", 1935, "oflaDemo", "myStream");
            client.recordStream();
        }
    
        private RTMPClient client;
    
        private ITagWriter writer;
    
        private String sourceHost;
    
        private int sourcePort;
    
        private String sourceApp;
    
        private String sourceStreamName;
    
        private int lastTimestamp;
    
        private int startTimestamp = -1;
    
        public TeqniRTMPClient(String sourceHost, int sourcePort, String sourceApp,
                String sourceStreamName) {
            super();
            this.sourceHost = sourceHost;
            this.sourcePort = sourcePort;
            this.sourceApp = sourceApp;
            this.sourceStreamName = sourceStreamName;
        }
    
        public void recordStream() throws IOException {
            client = new RTMPClient();
    
            String path = "c:\\temp\\out.flv";
            File file = new File(path);
            if (!file.exists()) {
                file.createNewFile();
            }
    
            FLVService flvService = new FLVService();
            flvService.setGenerateMetadata(true);
            try {
                IStreamableFile flv = flvService.getStreamableFile(file);
                writer = flv.getWriter();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
    
    
    
            client.setStreamEventDispatcher(new StreamEventDispatcher());
            client.setStreamEventHandler(new INetStreamEventHandler() {
                public void onStreamEvent(Notify notify) {
                    System.out.printf("onStreamEvent: %s\n", notify);
                    ObjectMap<?, ?> map = (ObjectMap<?, ?>) notify.getCall().getArguments()[0];
                    String code = (String) map.get("code");
                    System.out.printf("<:%s\n", code);
                    if (StatusCodes.NS_PLAY_STREAMNOTFOUND.equals(code)) {
                        System.out.println("Requested stream was not found");
                        client.disconnect();
    
                    }
                    else if (StatusCodes.NS_PLAY_UNPUBLISHNOTIFY.equals(code)
                            || StatusCodes.NS_PLAY_COMPLETE.equals(code)) {
                        System.out.println("Source has stopped publishing or play is complete");
                        client.disconnect();
    
                    }
                }
            });
            client.setExceptionHandler(new ClientExceptionHandler() {
                @Override
                public void handleException(Throwable throwable) {
                    throwable.printStackTrace();
                    System.exit(1);
                }
            });
    
            client.setConnectionClosedHandler(new Runnable() {
                public void run() {
                    if (writer != null) {
                        writer.close();
    
                    }
                    System.out.println("Source connection has been closed, proxy will be stopped");
                    System.exit(0);
                }
            });
    
            // connect the consumer
            Map<String, Object> defParams = client.makeDefaultConnectionParams(sourceHost, sourcePort,
                    sourceApp);
            // add pageurl and swfurl
            defParams.put("pageUrl", "");
            defParams.put("swfUrl", "app:/Red5-StreamRelay.swf");
            // indicate for the handshake to generate swf verification data
            client.setSwfVerification(true);
            // connect the client
            client.connect(sourceHost, sourcePort, defParams, new IPendingServiceCallback() {
                public void resultReceived(IPendingServiceCall call) {
                    System.out.println("connectCallback");
                    ObjectMap<?, ?> map = (ObjectMap<?, ?>) call.getResult();
                    String code = (String) map.get("code");
                    if ("NetConnection.Connect.Rejected".equals(code)) {
                        System.out.printf("Rejected: %s\n", map.get("description"));
                        client.disconnect();
                    }
                    else if ("NetConnection.Connect.Success".equals(code)) {
                        // 1. Wait for onBWDone
                        client.createStream(new CreateStreamCallback());
                        Object result = call.getResult();
                        System.out.println("Red5ClientTest.main()");
                    }
                    else {
                        System.out.printf("Unhandled response code: %s\n", code);
                    }
                }
            });
        }
    
        class CreateStreamCallback implements IPendingServiceCallback {
    
            public void resultReceived(IPendingServiceCall call) {
                System.out.println("resultReceived: " + call);
                int streamId = ((Number) call.getResult()).intValue();
                System.out.println("stream id: " + streamId);
                // send our buffer size request
                if (sourceStreamName.endsWith(".flv") || sourceStreamName.endsWith(".f4v")
                        || sourceStreamName.endsWith(".mp4")) {
                    client.play(streamId, sourceStreamName, 0, -1);
                }
                else {
                    client.play(streamId, sourceStreamName, -1, 0);
                }
            }
    
        }
    
        class StreamEventDispatcher implements IEventDispatcher {
    
            private int videoTs;
    
            private int audioTs;
    
            public void dispatchEvent(IEvent event) {
                System.out.println("ClientStream.dispachEvent()" + event.toString());
                try {
    
    
    
                    IRTMPEvent rtmpEvent = (IRTMPEvent) event;
                    logger.debug("rtmp event: " + rtmpEvent.getHeader() + ", "
                            + rtmpEvent.getClass().getSimpleName());
                    if (!(rtmpEvent instanceof IStreamData)) {
                        logger.debug("skipping non stream data");
                        return;
                    }
                    if (rtmpEvent.getHeader().getSize() == 0) {
                        logger.debug("skipping event where size == 0");
                        return;
                    }
    
                    byte dataType = rtmpEvent.getDataType();
                    ITag tag = new Tag();
                    tag.setDataType(dataType);
                    if (rtmpEvent instanceof VideoData) {
                        VideoData video = (VideoData) rtmpEvent;
                        FrameType frameType = video.getFrameType();
                        videoTs += rtmpEvent.getTimestamp();
                        tag.setTimestamp(videoTs);
                    }
                    else if (rtmpEvent instanceof AudioData) {
                        audioTs += rtmpEvent.getTimestamp();
                        tag.setTimestamp(audioTs);
                    }
    
                    IoBuffer data = ((IStreamData) rtmpEvent).getData().asReadOnlyBuffer();
                    tag.setBodySize(data.limit());
                    tag.setBody(data);
                    try {
    
                        writer.writeTag(tag);
    
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println("writting....");
    
    
    
                }
                catch (Exception e) {//IOException
                    e.printStackTrace();
                }
            }
    
        }
    }