Skip to content

Commit

Permalink
pull stream support UDP mode
Browse files Browse the repository at this point in the history
  • Loading branch information
macbookpro committed Jan 4, 2019
1 parent cd373b1 commit f156df5
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 36 deletions.
37 changes: 33 additions & 4 deletions rtsp/rtsp-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type RTSPClient struct {
Path string
CustomPath string //custom path for pusher
ID string
Conn net.Conn
Conn *RichConn
Session string
Seq int
connRW *bufio.ReadWriter
Expand All @@ -57,6 +57,7 @@ type RTSPClient struct {
vRTPChannel int
vRTPControlChannel int

UDPServer *UDPServer
RTPHandles []func(*RTPPack)
StopHandles []func()
}
Expand All @@ -76,6 +77,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
URL: rawUrl,
ID: shortid.MustGenerate(),
Path: url.Path,
TransType: TRANS_TYPE_UDP,
vRTPChannel: 0,
vRTPControlChannel: 1,
aRTPChannel: 2,
Expand Down Expand Up @@ -206,14 +208,14 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
// handle error
return err
}
client.Conn = conn

networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(204800)

timeoutConn := RichConn{
conn,
timeout,
}
client.Conn = &timeoutConn
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer))

headers := make(map[string]string)
Expand Down Expand Up @@ -280,7 +282,21 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
_url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/")
}
headers = make(map[string]string)
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel)
if client.TransType == TRANS_TYPE_TCP {
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel)
} else {
if client.UDPServer == nil {
client.UDPServer = &UDPServer{RTSPClient: client}
}
//RTP/AVP;unicast;client_port=64864-64865
err = client.UDPServer.SetupVideo()
if err != nil {
logger.Printf("Setup video err.%v", err)
return err
}
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort)
client.Conn.timeout = 0 // UDP ignore timeout
}
if Session != "" {
headers["Session"] = Session
}
Expand All @@ -300,7 +316,20 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
_url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/")
}
headers = make(map[string]string)
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel)
if client.TransType == TRANS_TYPE_TCP {
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel)
} else {
if client.UDPServer == nil {
client.UDPServer = &UDPServer{RTSPClient: client}
}
err = client.UDPServer.SetupAudio()
if err != nil {
logger.Printf("Setup audio err.%v", err)
return err
}
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort)
client.Conn.timeout = 0 // UDP ignore timeout
}
if Session != "" {
headers["Session"] = Session
}
Expand Down
50 changes: 36 additions & 14 deletions rtsp/rtsp-session.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ func (session *Session) handleRequest(req *Request) {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
}
setupPath := setupUrl.String()

// 播放器可能直接从SETUP来,不用DESCRIBE(比如可能事先已经获取过了)
if session.Pusher == nil {
session.Path = setupUrl.Path
pusher := session.Server.GetPusher(session.Path)
if pusher == nil {
res.StatusCode = 404
res.Status = "NOT FOUND"
return
}
session.Type = SESSEION_TYPE_PLAYER
session.Player = NewPlayer(session, pusher)
session.Pusher = pusher
session.AControl = pusher.AControl()
session.VControl = pusher.VControl()
session.ACodec = pusher.ACodec()
session.VCodec = pusher.VCodec()
session.Conn.timeout = 0
}
//setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:]
vPath := ""
if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 {
Expand Down Expand Up @@ -446,7 +465,7 @@ func (session *Session) handleRequest(req *Request) {
session.TransType = TRANS_TYPE_UDP
// no need for tcp timeout.
session.Conn.timeout = 0
if session.UDPClient == nil {
if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil {
session.UDPClient = &UDPClient{
Session: session,
}
Expand All @@ -458,14 +477,15 @@ func (session *Session) handleRequest(req *Request) {
}
logger.Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
}

if session.Type == SESSION_TYPE_PUSHER {
if err := session.Pusher.UDPServer.SetupAudio(); err != nil {
res.StatusCode = 500
Expand All @@ -485,12 +505,14 @@ func (session *Session) handleRequest(req *Request) {
ts = strings.Join(tss, ";")
}
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
}

if session.Type == SESSION_TYPE_PUSHER {
Expand Down
70 changes: 52 additions & 18 deletions rtsp/udp-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package rtsp

import (
"bytes"
"fmt"
"log"
"net"
"strconv"
"strings"
Expand All @@ -11,6 +13,7 @@ import (

type UDPServer struct {
*Session
*RTSPClient

APort int
AConn *net.UDPConn
Expand All @@ -24,6 +27,45 @@ type UDPServer struct {
Stoped bool
}

func (s* UDPServer)AddInputBytes(bytes int) {
if s.Session != nil {
s.Session.InBytes += bytes
return
}
if s.RTSPClient != nil {
s.RTSPClient.InBytes += bytes
return
}
panic(fmt.Errorf("session and RTSPClient both nil"))
}

func (s *UDPServer)HandleRTP(pack *RTPPack) {
if s.Session != nil {
for _, v := range s.Session.RTPHandles {
v(pack)
}
return
}

if s.RTSPClient != nil {
for _, v := range s.RTSPClient.RTPHandles {
v(pack)
}
return
}
panic(fmt.Errorf("session and RTSPClient both nil"))
}

func (s *UDPServer) Logger() *log.Logger {
if s.Session != nil {
return s.Session.logger
}
if s.RTSPClient != nil {
return s.RTSPClient.logger
}
panic(fmt.Errorf("session and RTSPClient both nil"))
}

func (s *UDPServer) Stop() {
if s.Stoped {
return
Expand All @@ -48,7 +90,7 @@ func (s *UDPServer) Stop() {
}

func (s *UDPServer) SetupAudio() (err error) {
logger := s.logger
logger := s.Logger()
addr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil {
return
Expand Down Expand Up @@ -77,15 +119,13 @@ func (s *UDPServer) SetupAudio() (err error) {
for !s.Stoped {
if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n)
s.Session.InBytes += n
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: bytes.NewBuffer(rtpBytes),
}
for _, h := range s.Session.RTPHandles {
h(pack)
}
s.HandleRTP(pack)
} else {
logger.Println("udp server read audio pack error", err)
continue
Expand Down Expand Up @@ -119,15 +159,13 @@ func (s *UDPServer) SetupAudio() (err error) {
for !s.Stoped {
if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n)
s.Session.InBytes += n
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: bytes.NewBuffer(rtpBytes),
}
for _, h := range s.Session.RTPHandles {
h(pack)
}
s.HandleRTP(pack)
} else {
logger.Println("udp server read audio control pack error", err)
continue
Expand All @@ -138,7 +176,7 @@ func (s *UDPServer) SetupAudio() (err error) {
}

func (s *UDPServer) SetupVideo() (err error) {
logger := s.logger
logger := s.Logger()
addr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil {
return
Expand Down Expand Up @@ -167,15 +205,13 @@ func (s *UDPServer) SetupVideo() (err error) {
for !s.Stoped {
if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n)
s.Session.InBytes += n
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: bytes.NewBuffer(rtpBytes),
}
for _, h := range s.Session.RTPHandles {
h(pack)
}
s.HandleRTP(pack)
} else {
logger.Println("udp server read video pack error", err)
continue
Expand Down Expand Up @@ -210,15 +246,13 @@ func (s *UDPServer) SetupVideo() (err error) {
for !s.Stoped {
if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n)
s.Session.InBytes += n
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: bytes.NewBuffer(rtpBytes),
}
for _, h := range s.Session.RTPHandles {
h(pack)
}
s.HandleRTP(pack)
} else {
logger.Println("udp server read video control pack error", err)
continue
Expand Down

0 comments on commit f156df5

Please sign in to comment.