使用 libstreaming 实现同屏时的代码执行流程

本文最后更新于:1 年前

Reference


概述

实现同屏需要一个服务端和至少一个客户端。服务端是发送音视频数据的,数据来源可以是相机、屏幕捕捉等。客户端是接收音视频数据的,接收到的数据会实时在屏幕上显示。

MediaMirrorlibstreaming的 demo,实现了服务端通过相机采集数据,并发送数据给客户端,客户端实时在屏幕上显示的功能。

实现同屏的操作是:
1、服务端和客户端连接同一个 AP。
2、服务端安装在 Android 6.0+ 系统上需要手动授权。
3、客户端需要修改 IP 地址指向服务端。
4、先启动服务端,再启动客户端。

MediaMirror基于libstreaming,对代码有修改,以下分析基于MediaMirror


启动服务端

入口 Activity

代码很少,全部贴出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class MainActivity extends AppCompatActivity implements SurfaceHolder.Callback {
private SurfaceView surfaceView;

private Session session;
private RtspServer rtspServer;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

surfaceView = findViewById(R.id.surface);

surfaceView.getHolder().addCallback(this);
startRtspServer();
}

@Override
protected void onDestroy() {
super.onDestroy();
stopRtspServer();
}

private void startRtspServer() {
rtspServer = new RtspServer();
rtspServer.start();
}

private void stopRtspServer() {
rtspServer.stop();
}

private void buildSession() {
session = SessionBuilder.getInstance()
.setSurfaceView(surfaceView)
.setPreviewOrientation(90)
.setContext(this)
.setAudioEncoder(SessionBuilder.AUDIO_AAC)
.setAudioQuality(new AudioQuality(16000, 32000))
.setVideoEncoder(SessionBuilder.VIDEO_H264)
.setVideoQuality(new VideoQuality(320, 240, 20, 500000))
.build();
}

/*SurfaceHolder.Callback start*/

@Override
public void surfaceCreated(SurfaceHolder holder) {
buildSession();
}

@Override
public void surfaceChanged(SurfaceHolder holder, int format, int width, int height) {
}

@Override
public void surfaceDestroyed(SurfaceHolder holder) {
session.release();
}

/*SurfaceHolder.Callback end*/
}

布局文件中只有一个普通的SurfaceView,在SurfaceView回调surfaceCreated(SurfaceHolder holder)方法时会调用buildSession()方法,buildSession()设置了Session的参数并最终调用SessionBuilder#build()方法进行初始化。

然后,创建了RtspServer实例并启动。

SessionBuilder#build()

创建 H.264 视频的对应流并设置给Session

1
2
3
4
5
H264Stream stream = new H264Stream(mCameraFacing);
if (mContext != null) {
stream.setSharedPreferences(PreferenceManager.getDefaultSharedPreferences(mContext));
}
session.addVideoTrack(stream);

创建 AAC 音频的对应流并设置给Session

1
2
3
4
5
AacStream stream = new AacStream();
session.addAudioTrack(stream);
if (mContext != null) {
stream.setPreferences(PreferenceManager.getDefaultSharedPreferences(mContext));
}

H264Stream 构造函数

H264StreamVideoStream的子类,这里调用了VideoStream的有参构造函数:

1
super(cameraId);

创建了H264Packetizer实例:

1
packetizer = new H264Packetizer();
VideoStream 有参构造函数

VideoStreamMediaStream的子类,这里调用了MediaStream的无参构造函数。

H264Packetizer 构造函数

H264PacketizerAbstractPacketizer的子类,这里调用了AbstractPacketizer的无参构造函数。

AbstractPacketizer 构造函数

创建了RtpSocket实例:

1
rtpSocket = new RtpSocket();

####### RtpSocket 构造函数

创建 UDP 的数据包数组:

1
datagramPackets = new DatagramPacket[bufferCount];

创建MulticastSocket实例:

1
multicastSocket = new MulticastSocket();

AacStream 构造函数

AacStreamAudioStream的子类,这里调用了AudioStream的无参构造函数。

AudioStream 构造函数

AudioStreamMediaStream的子类,这里调用了MediaStream的无参构造函数。

RtspServer#start()

创建RequestListener实例:

1
requestListener = new RequestListener();

RequestListener 构造函数

RequestListener是一个线程,在构造函数中会创建ServerSocket实例,并开启这个线程:

1
2
serverSocket = new ServerSocket(port);
start();
RequestListener#run()

创建WorkerThread实例,WorkerThread同样是个线程,创建后立刻开启线程:

1
new WorkerThread(serverSocket.accept()).start();
WorkerThread 有参构造函数

创建客户端输入流的缓冲流和输出流:

1
2
mInput = new BufferedReader(new InputStreamReader(client.getInputStream()));
mOutput = client.getOutputStream();

创建Session实例:

1
session = new Session();

####### WorkerThread#run()

等待客户端连接,如果接收到客户端请求,解析该请求,并做出对应的响应,最后发送响应给客户端。

当客户端断开连接后,停止流传输并释放资源:

1
2
3
4
5
6
7
8
9
10
11
12
boolean streaming = isStreaming();
session.syncStop();
if (streaming && !isStreaming()) {
postMessage(MESSAGE_STREAMING_STOPPED);
}
session.release();

try {
mClient.close();
} catch (IOException e) {
e.printStackTrace();
}

启动客户端

入口 Activity

代码很少,全部贴出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class MainActivity extends AppCompatActivity implements SurfaceHolder.Callback, MediaPlayer.OnPreparedListener {
private static final String VIDEO_URL_HOST = "192.168.0.114";
private static final String VIDEO_URL_PORT = "8086";// RtspServer.DEFAULT_RTSP_PORT

private SurfaceView surfaceView;

private MediaPlayer mediaPlayer;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

surfaceView = findViewById(R.id.surface);

surfaceView.getHolder().addCallback(this);
}

private void configureMediaPlayer(Uri videoUri) {
if (mediaPlayer == null) {
mediaPlayer = new MediaPlayer();
}
mediaPlayer.setDisplay(surfaceView.getHolder());
mediaPlayer.setOnPreparedListener(this);
try {
mediaPlayer.setDataSource(this, videoUri);
mediaPlayer.prepareAsync();
} catch (IOException e) {
e.printStackTrace();
}
}

private void releaseMediaPlayer() {
surfaceView.getHolder().removeCallback(this);
mediaPlayer.release();
}

/*SurfaceHolder.Callback start*/

@Override
public void surfaceCreated(SurfaceHolder holder) {
StringBuilder videoUrlBuilder = new StringBuilder();
videoUrlBuilder.append("rtsp://" + VIDEO_URL_HOST + ":" + VIDEO_URL_PORT + "/");
Uri videoUri = Uri.parse(videoUrlBuilder.toString());
configureMediaPlayer(videoUri);
}

@Override
public void surfaceChanged(SurfaceHolder holder, int format, int width, int height) {
}

@Override
public void surfaceDestroyed(SurfaceHolder holder) {
releaseMediaPlayer();
}

/*SurfaceHolder.Callback end*/

/*MediaPlayer.OnPreparedListener start*/

@Override
public void onPrepared(MediaPlayer mp) {
mp.start();
}

/*MediaPlayer.OnPreparedListener end*/
}

布局文件中只有一个 Android 原生的VideoViewVideoViewSurfaceView的子类,在SurfaceView回调surfaceCreated(SurfaceHolder holder)方法时,调用configureMediaPlayer()方法,该方法中会创建MediaPlayer实例,并在设置MediaPlayer的参数后开始准备,最后在onPrepared(MediaPlayer mp)回调中开始播放。

VideoView支持RTSPRTSP请求都是自发的。

服务端接收到请求

解析该请求:

1
request = Request.parseRequest(mInput);

并做出对应的响应:

1
response = processRequest(request);

最后发送响应给客户端。

1
response.send(mOutput);

RtspServer#processRequest(Request request)

该方法会根据不同的客户端请求,执行相应处理,并拼接响应报文,最后返回给客户端。

1、当是DESCRIBE请求时,调用了handleRequest()方法(该方法创建了Session,所以该请求是建立同步的入口):

1
session = handleRequest(request.uri, mClient);

拼接会话描述信息:

1
2
3
4
5
6
7
8
StringBuilder builder = new StringBuilder();
builder.append("Content-Base: " + mClient.getLocalAddress().getHostAddress() + ":" + mClient.getLocalPort() + "/")// Content-Base: 10.4.70.229:8086/
.append("\r\n")
.append("Content-Type: application/sdp")// Content-Type: application/sdp
.append("\r\n");
response.attributes = builder.toString();
response.content = session.getSessionDescription();
response.status = Response.STATUS_OK;

2、当是SETUP请求时,会将音视频通道分别映射到客户端指定的端口,如果客户端没有指定端口,则使用默认端口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Pattern pattern;
Matcher matcher;
int port1, port2, ssrc, trackId, src[];
String destination;

pattern = Pattern.compile("trackID=(\\w+)", Pattern.CASE_INSENSITIVE);
matcher = pattern.matcher(request.uri);

if (!matcher.find()) {
response.status = Response.STATUS_BAD_REQUEST;
return response;
}

trackId = Integer.parseInt(matcher.group(1));

if (!session.isTrackExist(trackId)) {
response.status = Response.STATUS_NOT_FOUND;
return response;
}

pattern = Pattern.compile("client_port=(\\d+)-(\\d+)", Pattern.CASE_INSENSITIVE);
matcher = pattern.matcher(request.headers.get("transport"));
IStream track = session.getTrack(trackId);

if (!matcher.find()) {
int[] ports = track.getDestinationPorts();
port1 = ports[0];
port2 = ports[1];
} else {
port1 = Integer.parseInt(matcher.group(1));
port2 = Integer.parseInt(matcher.group(2));
}

ssrc = track.getSsrc();
src = track.getLocalPorts();
destination = session.getDestination();

track.setDestinationPorts(port1, port2);

调用了Session#syncStart()方法,该方法用于开始同步:

1
session.syncStart(trackId);

拼接会话描述信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
StringBuilder builder = new StringBuilder();
builder.append("Transport: RTP/AVP/UDP;" + (InetAddress.getByName(destination).isMulticastAddress() ? "multicast" : "unicast")
+ ";destination=" + session.getDestination()
+ ";client_port=" + port1 + "-" + port2
+ ";server_port=" + src[0] + "-" + src[1]
+ ";ssrc=" + Integer.toHexString(ssrc)
+ ";mode=play"
)// Transport: RTP/AVP/UDP;unicast;destination=10.4.70.110;client_port=57148-57149;server_port=39848-33032;ssrc=4b5eeef9;mode=play
.append("\r\n")
.append("Session: " + "1185d20035702ca")// Session: 1185d20035702ca
.append("\r\n")
.append("Cache-Control: no-cache")// Cache-Control: no-cache
.append("\r\n");
response.attributes = builder.toString();
response.status = Response.STATUS_OK;
RtspServer#handleRequest(String uri, Socket client)

调用了UriParser#parse()方法,该方法用于解析客户端传递的参数:

1
Session session = UriParser.parse(uri);

设置会话的源地址和目的地址:

1
2
3
4
session.setOrigin(client.getLocalAddress().getHostAddress());
if (session.getDestination() == null) {
session.setDestination(client.getInetAddress().getHostAddress());
}
Session#syncStart(int id)

开始同步流:

1
stream.start();

对于H264视频,依次调用了H264Stream#start()->VideoStream#start()->MediaStream#start()->MediaStream#encodeWithMediaCodec()->VideoStream#encodeWithMediaCodec()->VideoStream#encodeWithMediaCodecMethod1()

VideoStream#encodeWithMediaCodecMethod1()

开始相机预览:

1
camera.startPreview();

调试编码参数后运行编码器:

1
2
3
4
5
6
7
8
9
10
EncoderDebugger debugger = EncoderDebugger.debug(mSharedPreferences, mQuality.resX, mQuality.resY);
final Nv21Convertor convertor = debugger.getNv21Convertor();
mediaCodec = MediaCodec.createByCodecName(debugger.getEncoderName());
MediaFormat mediaFormat = MediaFormat.createVideoFormat("video/avc", mQuality.resX, mQuality.resY);
mediaFormat.setInteger(MediaFormat.KEY_BIT_RATE, mQuality.bitrate);
mediaFormat.setInteger(MediaFormat.KEY_FRAME_RATE, mQuality.framerate);
mediaFormat.setInteger(MediaFormat.KEY_COLOR_FORMAT, debugger.getEncoderColorFormat());
mediaFormat.setInteger(MediaFormat.KEY_I_FRAME_INTERVAL, 1);
mediaCodec.configure(mediaFormat, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE);
mediaCodec.start();

将预览回调的帧数据进行转换,再由编码器编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
for (int i = 0; i < 10; i++) {
camera.addCallbackBuffer(new byte[convertor.getBufferSize()]);
}
camera.setPreviewCallbackWithBuffer(new Camera.PreviewCallback() {
ByteBuffer[] inputBuffers = mediaCodec.getInputBuffers();

@Override
public void onPreviewFrame(byte[] data, Camera camera) {
try {
int bufferIndex = mediaCodec.dequeueInputBuffer(500000);
if (bufferIndex >= 0) {
inputBuffers[bufferIndex].clear();
if (data == null) {
Log.e(TAG, "Symptom of the \"Callback buffer was to small\" problem...");
} else {
convertor.convert(data, inputBuffers[bufferIndex]);
}
long now = System.nanoTime() / 1000;
mediaCodec.queueInputBuffer(bufferIndex, 0, inputBuffers[bufferIndex].position(), now, 0);
} else {
Log.e(TAG, "No buffer available !");
}
} finally {
VideoStream.this.camera.addCallbackBuffer(data);
}
}
});

将数据写入打包器,启动打包器线程:

1
2
packetizer.setInputStream(new MediaCodecInputStream(mediaCodec));
packetizer.start();

####### H264Packetizer#run()

调用了send()方法发送数据。

######## H264Packetizer#send()

生成NAL单元LENGTH

1
fill(header, 0, NALU_LENGTH_LENGTH);

生成时间戳和NAL单元长度:

1
2
timestamp = ((MediaCodecInputStream) inputStream).getLastBufferInfo().presentationTimeUs * 1000L;
naluLength = inputStream.available() + 1;

如果出现NAL单元类型为IDR类型,需要调用AbstractPacketizer#send()方法发送该数据:

1
2
3
4
5
buffer = rtpSocket.requestBuffer();
rtpSocket.markNextPacket();
rtpSocket.updateTimestamp(timestamp);
System.arraycopy(mStapA, 0, buffer, RtpSocket.RTP_HEADER_LENGTH, mStapA.length);
super.send(RtpSocket.RTP_HEADER_LENGTH + mStapA.length);

如果NAL长度低于设置的最大包大小,会调用AbstractPacketizer#send()方法发送该数据,否则,拆分NAL单元为FU-A单元再调用AbstractPacketizer#send()方法发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Small NAL unit => Single NAL unit
if (naluLength <= MAX_PACKET_SIZE - RtpSocket.RTP_HEADER_LENGTH - 2) {
buffer = rtpSocket.requestBuffer();
// RTP 头后面的一个字节用于表示前一个 NALU 的长度
buffer[RtpSocket.RTP_HEADER_LENGTH] = header[4];
fill(buffer, RtpSocket.RTP_HEADER_LENGTH + 1, naluLength - 1);
rtpSocket.updateTimestamp(timestamp);
rtpSocket.markNextPacket();
super.send(naluLength + RtpSocket.RTP_HEADER_LENGTH);
// Log.d(TAG,"----- Single NAL unit - len:"+len+" delay: "+delay);
}
// Large NAL unit => Split nal unit
else {
// Set FU-A header
header[1] = (byte) (header[4] & 0x1F); // FU header type
header[1] += 0x80; // Start bit
// Set FU-A indicator
header[0] = (byte) ((header[4] & 0x60) & 0xFF); // FU indicator NRI
header[0] += 28;

while (sum < naluLength) {
buffer = rtpSocket.requestBuffer();
buffer[RtpSocket.RTP_HEADER_LENGTH] = header[0];
buffer[RtpSocket.RTP_HEADER_LENGTH + 1] = header[1];
rtpSocket.updateTimestamp(timestamp);
if ((len = fill(buffer, RtpSocket.RTP_HEADER_LENGTH + 2, Math.min(naluLength - sum, MAX_PACKET_SIZE - RtpSocket.RTP_HEADER_LENGTH - 2))) < 0) {
return;
}

sum += len;
// Last packet before next NAL
// 下一个 NAL 前的最后一个包
if (sum >= naluLength) {
// End bit on
buffer[RtpSocket.RTP_HEADER_LENGTH + 1] += 0x40;
rtpSocket.markNextPacket();
}
super.send(len + RtpSocket.RTP_HEADER_LENGTH + 2);
// Switch start bit
header[1] = (byte) (header[1] & 0x7F);
// Log.d(TAG,"----- FU-A unit, sum:"+sum);
}
}

######### AbstractPacketizer#send(int length)

调用了RtpSocket#commitBuffer()方法:

1
rtpSocket.commitBuffer(length);

########## RtpSocket#commitBuffer(int length)

调用updateSequence()方法,updateSequence()方法调用了setLong()方法,setLong()方法用于扩充数组索引长度。原来是 1 个字节表示一个索引,则最多可以表示 256 个索引,现在用连续的 2 个字节表示一个索引。

设置UDP包当前索引对应数据的长度:

1
datagramPackets[bufferInputIndex].setLength(length);

如果UDP包下一个索引已超过设置的缓冲大小时,从头开始替换:

1
2
3
if (++bufferInputIndex >= bufferCount) {
bufferInputIndex = 0;
}

如果RtpSocket线程未启动,则启动该线程:

1
2
3
4
if (mThread == null) {
mThread = new Thread(this);
mThread.start();
}

########### RtpSocket#run()

调用SenderReport#update()方法更新数据:

1
senderReport.update(datagramPackets[bufferOutputIndex].getLength(), (timestamps[bufferOutputIndex] / 100L) * (mClock / 1000L) / 10000L);

忽略前 30 帧,之后发送缓冲区中的数据:

1
2
3
4
5
6
7
if (count++ > 30) {
if (transport == TRANSPORT_UDP) {
multicastSocket.send(datagramPackets[bufferOutputIndex]);// TODO
} else {
sendTcp();
}
}

如果UDP包下一个索引已超过设置的缓冲大小时,从头开始发送:

1
2
3
if (++bufferOutputIndex >= BUFFER_COUNT) {
bufferOutputIndex = 0;
}

############ SenderReport#update(int length, long rtpTimestamp)

更新数据,最后调用了send()方法:

1
send(System.nanoTime(), rtpTimestamp);

############ SenderReport#send(long ntpTimestamp, long rtpTimestamp)

发送RTCP数据:

1
multicastSocket.send(datagramPacket);


使用 libstreaming 实现同屏时的代码执行流程
https://weichao.io/178acade2181/
作者
魏超
发布于
2018年3月11日
更新于
2022年12月4日
许可协议