Documentation/Tutorials/RTMPClient

RTMP クライアント

0.8 で RTMPClient に新たに変更が加えられ、柔軟性が向上しています。

RTMPClient は、継承したり、ほかのクラスに合成したりすることができます。どちらの場合も、ストリームイベントのコールバックメソッドの処理および例外処理のためのコールバックインタフェースをいくつか実装する必要があります。

RTMPClient の継承

import org.red5.server.api.service.IPendingServiceCallback;
import org.red5.server.net.rtmp.ClientExceptionHandler;
import org.red5.server.net.rtmp.INetStreamEventHandler;
import org.red5.server.api.event.IEventDispatcher; 

public class RTMPClientExtended extends RTMPClient implements INetStreamEventHandler, IPendingServiceCallback, ClientExceptionHandler, IEventDispatcher, ISharedObjectListener, IPushableConsumer, IPipeConnectionListener
{
....
}

RTMPClient サービスコールバックハンドラ

次のようにしてクラスでハンドラをセットアップする必要があります。

rtmpClient.setServiceProvider(this);
rtmpClient.setExceptionHandler(this);
rtmpClient.setCallbackProvider(this);
rtmpClient.setStreamEventDispatcher(this);

コールバックリザルトハンドラ

コールバックリザルトハンドラ

resultReceived メソッドは接続のステータスを決定します。Flash のステータスイベントと同様に動作します。

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                                play(streamIdInt, fileName, start, duration);
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}

コールバックストリームステータスハンドラ

createStream(this); で再生およびストリーミング用の NetStream をセットアップします。次の呼び出しでは creatStream をトラップし、最初に streamID を取得することによってファイルを再生することができます。

ストリームイベントステータスは、次のようにしてセットアップされます。

public void onStatus(Object obj)
{
   ObjectMap map = (ObjectMap) obj;
   String code = (String) map.get("code");
   String description = (String) map.get("description");
   String details = (String) map.get("details");
                
   if (StatusCodes.NS_PLAY_START.equals(code))
   {
      .......

   }
}

コールバックエラーハンドラ

接続エラーを処理するためのクライアント例外ハンドラは、次のようになります。

public void handleException(Throwable throwable)
{
                log.error("{}",new Object[]{throwable.getCause()});
}

ディスパッチイベントハンドラ

ストリーミング再生中の NetStream イベントの処理用で、ストリームのパケットをトラップします。次のようになります。

public void dispatchEvent(IEvent event) {  
        if (!(event instanceof IRTMPEvent)) {  
            log.debug("skipping non rtmp event: " + event);  
            return;  
        }  
        IRTMPEvent rtmpEvent = (IRTMPEvent) event;  
        /*
        if (log.isDebugEnabled()) {  
            log.debug("rtmp event: " + rtmpEvent.getHeader() + ", "  
                    + rtmpEvent.getClass().getSimpleName());  
        } */
        if (!(rtmpEvent instanceof IStreamData)) {  
            log.debug("skipping non stream data");  
            return;  
        }  
        if (rtmpEvent.getHeader().getSize() == 0) {  
            log.debug("skipping event where size == 0");  
            return;  
        }  
        ITag tag = new Tag();  
        tag.setDataType(rtmpEvent.getDataType());  
        if (rtmpEvent instanceof VideoData) {  
            videoTs += rtmpEvent.getTimestamp();  
            tag.setTimestamp(videoTs);  
        } else if (rtmpEvent instanceof AudioData) {  
            audioTs += rtmpEvent.getTimestamp();  
            tag.setTimestamp(audioTs);  
        }  
        
        ByteBuffer data = ((IStreamData) rtmpEvent).getData().asReadOnlyBuffer();  
        tag.setBodySize(data.limit());  
        tag.setBody(data);  
        //log.debug(data.toString());
        try {  
            writer.writeTag(tag);  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }  
 }  

接続タイムアウトは依然、RTMPClient でハードコードされていますが、継承クラスで startConnector メソッドのコードを書き換えれば変更できます。具体的には CONNECTOR_WORKER_TIMEOUT 定数としてハードコードされています。RTMPClient では、セッターメソッドを有効にして、タイムアウトを動的に設定できるようにする必要があります。

サンプル

再生のサンプル

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                               play(streamId, "stream.flv", 0); 
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}

配信のサンプル

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                                publish("publishtest", streamId, "live", this);
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}
public void onStatus(Object obj)
{
   ObjectMap map = (ObjectMap) obj;
   String code = (String) map.get("code");
   String description = (String) map.get("description");
   String details = (String) map.get("details");
                
   if (StatusCodes.NS_PUBLISH_START.equals(code))
   {
        state = RTMPStreamState.PUBLISH_START;
        log.debug("{} for {}", new Object[]{code,details});
        
                service = new FLVService();
                service.setSerializer(new Serializer());
                service.setDeserializer(new Deserializer());
                
                log.info("Started Publishing");
                
                // ファイルを読み取ってデータを配信!!
                try {
                
                        File f = new File(dir + "/" + fileName);
                        log.debug("test: {}", f);

                        IFLV flv = (IFLV) service.getStreamableFile(f);
                        flv.setCache(NoCacheImpl.getInstance());        
                        
                        ITagReader reader = flv.getReader();
        
                        FileStreamSource src = new FileStreamSource(reader);
                        
                        while (src.hasMore())
                        {
                                IRTMPEvent event = src.dequeue();
                                RTMPMessage rtmpMsg = new RTMPMessage();
                                rtmpMsg.setBody(rtmpEvent);
                
                                publishStreamData(streamId, rtmpMsg);
                        }
                        
                        this.getRTMPClient().unpublish(streamId);

                } catch (Exception ex) {
                        log.error(ex.getCause().toString());
                }
   
   }
}

共有オブジェクトのサンプル

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                               
                                IClientSharedObject obj = getSharedObject("server", true);
                                obj.addSharedObjectListener(this);
                               
                                Map<String,Object> map = new HashMap<String,Object>();
                                map.put("key","value");
                        
                              obj.beginUpdate();
                             obj.setAttributes(map);
                            obj.endUpdate();
                            obj.clear();

                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                               
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}

共有オブジェクトリスナー

public void onSharedObjectClear(ISharedObjectBase so) 
        {
                log.debug("Shared Object Clear");
        }
        
        public void onSharedObjectConnect(ISharedObjectBase so) 
        {
                log.debug("Shared Object Connect");
        }
        
        public void onSharedObjectDelete(ISharedObjectBase so, String key)
        {
                log.debug("Shared Object Delete");
        }
        
        public void onSharedObjectDisconnect(ISharedObjectBase so) 
        {
                log.debug("Shared Object Disconnect");
        }
        
        public void onSharedObjectSend(ISharedObjectBase so, String method, List params) 
        {
                log.debug("Shared Object Send");
        }
        
        public void onSharedObjectUpdate(ISharedObjectBase so, IAttributeStore values) 
        {
                log.debug("Shared Object Update");
        }
        
        public void onSharedObjectUpdate(ISharedObjectBase so, Map<String,Object> values) 
        {
                log.debug("Shared Object Updae");
        }
        
        public void onSharedObjectUpdate(ISharedObjectBase so, String key, Object value) 
        {
                log.debug("Shared Object Updae");
        }

コールバックのサンプル

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                               
                               invoke("service.CallService", new Object[]{"arg1","arg2"}, this);
                               
                                

                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                            
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}