自学内容网 自学内容网

深入浅出mediasoup—协议交互

本文主要分析 mediasoup 一对一 WebRTC 通信 demo 的协议交互,从协议层面了解 mediasoup 的设计与实现,这是深入阅读 mediasoup 源码的重要基础。

1. 时序图

下图是 mediasoup 客户端-服务器协议交互的总体架构,服务器是一个 Node.js 进程加一个 Worker 服务进程,客户端是一个 React 应用加一个支持 WebRTC 的浏览器,mediasoup 为客户端和服务器都提供了 SDK,服务器 SDK 封装了与 Worker 之间管道通信的协议细节,客户端 SDK 封装了 WebRTC API 接口,对外提供 ORTC 接口。

下图是 mediasoup 建立一对一 WebRTC 通信所涉及协议交互的时序图,展示了协议交互的几个重要阶段:初始化、准备、发送媒体和接受媒体,接下来的内容就来详细分析整个协议交互过程和实现细节。

2. 初始化阶段(Initialize)

Node.js 进程启动时会创建 Worker 子进程,并为每个 Worker 创建一个 WebRtcServer 对象用来承载媒体传输(在《通信框架》篇讲过,WebRtcServer 用来做端口汇聚)。同时,还会创建 WebSocket 服务,开始监听客户端连接。

客户端启动时会进行本地初始化,构造 URL 携带 roomId 和 peerId,开始连接 WebSocket 服务。服务器收到客户端连接会尝试创建房间(如果已经存在则不需创建),并在 Worker 上创建对应的 Router。

2.1. createWorker

可以在服务器的配置文件中配置启动的 Worker 数量,默认等于 cpu 核心数量。

module.exports =
{
...

mediasoup :
{
numWorkers     : Object.keys(os.cpus()).length,
...
}
...
}

服务器启动后,根据配置文件调用 createWorker 创建指定数量的 Worker。

async function runMediasoupWorkers()
{
// 从配置文件中获取启动的worker数量
const { numWorkers } = config.mediasoup;

for (let i = 0; i < numWorkers; ++i)
{
// 创建Worker
const worker = await mediasoup.createWorker(
{
dtlsCertificateFile : config.mediasoup.workerSettings.dtlsCertificateFile,
dtlsPrivateKeyFile  : config.mediasoup.workerSettings.dtlsPrivateKeyFile,
logLevel            : config.mediasoup.workerSettings.logLevel,
logTags             : config.mediasoup.workerSettings.logTags,
rtcMinPort          : Number(config.mediasoup.workerSettings.rtcMinPort),
rtcMaxPort          : Number(config.mediasoup.workerSettings.rtcMaxPort)
});

...

// 保存到worker集合
mediasoupWorkers.push(worker);

...
}
...
}

createWorker 接口参数如下:

export async function createWorker<
WorkerAppData extends types.AppData = types.AppData,
>({
// 打印的日志级别
logLevel = 'error',
// 打印的日志标签
logTags,
// RTC 通信端口范围
rtcMinPort = 10000,
rtcMaxPort = 59999,
// DTLS证书,所有DtlsTransport需要
dtlsCertificateFile,
// DTLS私钥,所有DtlsTransport需要
dtlsPrivateKeyFile,
// 可以配置 WebRTC 实验特性
libwebrtcFieldTrials,
// 自定义数据
appData,
}: WorkerSettings<WorkerAppData> = {}): Promise<Worker<WorkerAppData>> {

createWorker 通过spawn 创建 Worker 子进程,并建立 pipe 通信。

constructor({
logLevel,
logTags,
rtcMinPort,
rtcMaxPort,
dtlsCertificateFile,
dtlsPrivateKeyFile,
libwebrtcFieldTrials,
appData,
}: WorkerSettings<WorkerAppData>) {
...

// 创建Worker子进程
this.#child = spawn(
// worker可执行程序路径
spawnBin,
// worker启动参数
spawnArgs,
// options
{
// 子进程环境变量
env: {
MEDIASOUP_VERSION: version,
...process.env, // 继承父进程环境变量
},
// 子进程跟随父进程一起退出
detached: false,
// 忽略子进程的标准输入(fd 0)
// 创建管道关联子进程的标准输出(fd 1)和标准错误(fd 2),允许Node.js代码读取
// 创建管道关联子进程的fd 3和fd 4,允许Node.js代码进行读写,用来传输自定义协议
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe'],
// 隐藏子进程控制台
windowsHide: true,
}

// 保存子进程PID
this.#pid = this.#child.pid!;

// 基于子进程的fd 3和fd 4创建管道
this.#channel = new Channel({
// 主进程通过fd 3向子进程写入消息
producerSocket: this.#child.stdio[3],
// 主进程通过fd 4从子进程接收消息
consumerSocket: this.#child.stdio[4],
pid: this.#pid,
});

...

// 监听子进程Worker的stdout,在主进程通过日志器输出
this.#child.stdout!.on('data', buffer => {
for (const line of buffer.toString('utf8').split('\n')) {
if (line) {
workerLogger.debug(`(stdout) ${line}`);
}
}
});

// 监听子进程Worker的stderr,在主进程通过日志器输出
this.#child.stderr!.on('data', buffer => {
for (const line of buffer.toString('utf8').split('\n')) {
if (line) {
workerLogger.error(`(stderr) ${line}`);
}
}
});
);
...
}

2.2. createWebRtcServer

WebRtcServer 用来实现媒体通信端口聚合,可以配置其监听地址和端口,mediasoup 支持使用 TCP 或 UDP 传输媒体。

【注意】这里有两个地址,“ip”是进程监听的地址;“announceAddress”是公告给客户端来连接的地址,如果监听的是私网地址或 0.0.0.0,则一定要配置 announceAddress,否则客户端会连接失败。

module.exports =
{
...
mediasoup :
{
...
webRtcServerOptions :
{
listenInfos :
[
{
protocol         : 'udp',
ip               : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',
announcedAddress : process.env.MEDIASOUP_ANNOUNCED_IP,
port             : 44444
},
{
protocol         : 'tcp',
ip               : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',
announcedAddress : process.env.MEDIASOUP_ANNOUNCED_IP,
port             : 44444
}
]
},
...
}
...
}

每个 Worker 对应创建一个 WebRtcServer,需要防止 WebRtcServer 监听端口冲突。

async function runMediasoupWorkers()
{
...
if (process.env.MEDIASOUP_USE_WEBRTC_SERVER !== 'false')
{
// 从配置文件获取选项信息
const webRtcServerOptions = utils.clone(config.mediasoup.webRtcServerOptions);
const portIncrement = mediasoupWorkers.length - 1;

// 同一个主机上,不同Worker监听端口不能冲突
for (const listenInfo of webRtcServerOptions.listenInfos)
{
listenInfo.port += portIncrement;
}

// 创建WebRtcServer
const webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);

// 设置webrtcServer
worker.appData.webRtcServer = webRtcServer;
}
...
}

createWebRtcServer 实现如下,在这里分配 WebRtcServer ID,向 Worker 进程发送 pipe 消息。

async createWebRtcServer<WebRtcServerAppData extends AppData = AppData>({
listenInfos,
appData,
}: WebRtcServerOptions<WebRtcServerAppData>): Promise<
WebRtcServer<WebRtcServerAppData>
> {
...

// Build the request.
const fbsListenInfos: FbsTransport.ListenInfoT[] = [];

for (const listenInfo of listenInfos) {
fbsListenInfos.push(
new FbsTransport.ListenInfoT(
listenInfo.protocol === 'udp'
? FbsTransportProtocol.UDP
: FbsTransportProtocol.TCP,
listenInfo.ip,
listenInfo.announcedAddress ?? listenInfo.announcedIp,
listenInfo.port,
portRangeToFbs(listenInfo.portRange),
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize
)
);
}

// 创建UUID作为WebRtcServer的ID
const webRtcServerId = utils.generateUUIDv4();

const createWebRtcServerRequestOffset =
new FbsWorker.CreateWebRtcServerRequestT(
webRtcServerId,
fbsListenInfos
).pack(this.#channel.bufferBuilder);

// 向Worker进程发送pipe消息
await this.#channel.request(
FbsRequest.Method.WORKER_CREATE_WEBRTCSERVER,
FbsRequest.Body.Worker_CreateWebRtcServerRequest,
createWebRtcServerRequestOffset
);

const webRtcServer = new WebRtcServer<WebRtcServerAppData>({
internal: { webRtcServerId },
channel: this.#channel,
appData,
});

this.#webRtcServers.add(webRtcServer);
webRtcServer.on('@close', () => this.#webRtcServers.delete(webRtcServer));

// Emit observer event.
this.#observer.safeEmit('newwebrtcserver', webRtcServer);

return webRtcServer;
}

2.3. connect

客户端启动后会立即连接 WebSocket 服务,建立双向信令传输信道。mediasoup 使用 protoo 库实现WebSocket 通信。连接 WebSocket 服务的逻辑流程描述如下:

1)app 打包时,指定 index.jsx 为入口

{
  ...
  "main": "lib/index.jsx",
...
}
...
const PKG = require('./package');
...

function bundle(options)
{
...
let bundler = browserify(
{
entries      : PKG.main, // 入口定义
...
})
...
}
...

2)加载 index.jsx 会执行 run 函数,并加载 Room 组件

// 生命周期钩子
domready(async () =>
{
logger.debug('DOM ready');

await utils.initialize();

run();
});

...

// 渲染函数
render(
<Provider store={store}>
<RoomContext.Provider value={roomClient}>
<Room />
</RoomContext.Provider>
</Provider>,
document.getElementById('mediasoup-demo-app-container')
);

...

3)在 Room 组件加载过程中,会调用 RoomClient 的 join 方法

// 生命周期钩子
componentDidMount()
{
const { roomClient }= this.props;

roomClient.join();
}

4)在 RoomClient::join 方法中创建 websocket 连接

async join()
{
store.dispatch(
stateActions.setMediasoupClientVersion(mediasoupClient.version));

const protooTransport = new protooClient.WebSocketTransport(this._protooUrl);

this._protoo = new protooClient.Peer(protooTransport);

...
}

_protooUrl 根据页面 URL 进行构造,如果页面 URL 中没有指定,程序会自动生成 roomId 和 peerId 参数:

wss://192.168.28.164:4443/?roomId=cgxbcht8&peerId=5pwtlewx

2.4. mediasoup-version

Websocket 连接创建成功后,服务器会立即向客户端发送版本信息,版本号是从服务端 SDK 获取。

handleProtooConnection({ peerId, consume, protooWebSocketTransport })
{
...

// Notify mediasoup version to the peer.
peer.notify('mediasoup-version', { version: mediasoup.version })
.catch(() => {});

...
}

服务端 SDK 中导出了 version,version 信息存储在 package.json 中。

...
export const version: string = require('../../package.json').version;
...

package.json 中 version 配置如下。

{
"name": "mediasoup",
"version": "3.14.6",
...
}

2.5. createRouter

客户端 WebSocket 连接请求会携带 roomId,服务器会检查 roomId 是否已经存在,如果不存在则创建房间。

async function runProtooWebSocketServer()
{
...
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{
...
queue.push(async () =>
{
const room = await getOrCreateRoom({ roomId, consumerReplicas });
...
})
...
});
...
}
async function getOrCreateRoom({ roomId, consumerReplicas })
{
// 从map中寻找room
let room = rooms.get(roomId);

// 不存在则创建
if (!room)
{
logger.info('creating a new Room [roomId:%s]', roomId);

// 获取一个可用的worker(程序启动时已创建)
const mediasoupWorker = getMediasoupWorker();

// 创建Room
room = await Room.create({ mediasoupWorker, roomId, consumerReplicas });

// 保存Room
rooms.set(roomId, room);

// 监听room的close事件,从rooms移除关闭的房间
room.on('close', () => rooms.delete(roomId));
}

return room;
}

Room::create 方法中,会调用接口 createRouter 创建 router。可以看出,一个 room 对应一个 router

static async create({ mediasoupWorker, roomId, consumerReplicas })
{
logger.info('create() [roomId:%s]', roomId);

// Create a protoo Room instance.
const protooRoom = new protoo.Room();

// Router media codecs.
const { mediaCodecs } = config.mediasoup.routerOptions;

// Create a mediasoup Router.
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });

...
}

createRouter 的参数在 config.js 中配置,用来描述 router 的媒体能力。

module.exports =
{
...

routerOptions :
{
mediaCodecs :
[
{
kind      : 'audio',
mimeType  : 'audio/opus',
clockRate : 48000,
channels  : 2
},
{
kind       : 'video',
mimeType   : 'video/VP8',
clockRate  : 90000,
parameters :
{
'x-google-start-bitrate' : 1000
}
},
{
kind       : 'video',
mimeType   : 'video/VP9',
clockRate  : 90000,
parameters :
{
'profile-id'             : 2,
'x-google-start-bitrate' : 1000
}
},
{
kind       : 'video',
mimeType   : 'video/h264',
clockRate  : 90000,
parameters :
{
'packetization-mode'      : 1,
'profile-level-id'        : '4d0032',
'level-asymmetry-allowed' : 1,
'x-google-start-bitrate'  : 1000
}
},
{
kind       : 'video',
mimeType   : 'video/h264',
clockRate  : 90000,
parameters :
{
'packetization-mode'      : 1,
'profile-level-id'        : '42e01f',
'level-asymmetry-allowed' : 1,
'x-google-start-bitrate'  : 1000
}
}
]
},
  ...
}

createRouter 定义如下。生成 routeId,并生成创建 router 所需的 rtpCapabilities,然后向 Worker 发送管道消息创建 router。服务器支持的能力集在supportedRtpCapabilities.ts 文件中定义。

async createRouter<RouterAppData extends AppData = AppData>({
mediaCodecs,
appData,
}: RouterOptions<RouterAppData> = {}): Promise<Router<RouterAppData>> {
...

// Clone given media codecs to not modify input data.
const clonedMediaCodecs = utils.clone<RtpCodecCapability[] | undefined>(
mediaCodecs
);

// 生成RtpCapabilities,传入的codecs匹配服务器支持codecs
const rtpCapabilities =
ortc.generateRouterRtpCapabilities(clonedMediaCodecs);

// 生成UUID作为Router ID
const routerId = utils.generateUUIDv4();

// 构建请求,请求参数只有routeId
const createRouterRequestOffset = new FbsWorker.CreateRouterRequestT(
routerId
).pack(this.#channel.bufferBuilder);

// 发送pipe消息并等待响应
await this.#channel.request(
FbsRequest.Method.WORKER_CREATE_ROUTER,
FbsRequest.Body.Worker_CreateRouterRequest,
createRouterRequestOffset
);

// rtpCapabilities保存在SDK创建的Route对象中
const data = { rtpCapabilities };
const router = new Router<RouterAppData>({
internal: {
routerId,
},
data,
channel: this.#channel,
appData,
});

this.#routers.add(router);
router.on('@close', () => this.#routers.delete(router));

// Emit observer event.
this.#observer.safeEmit('newrouter', router);

return router;
}

3. 准备阶段(Prepare)

准备阶段主要是获取服务器能力集,提前在服务器上创建一个发送transport和接收transport,为协商完成后的媒体传输做准备。

3.1. getRouterRtpCapabilities 

客户端在 WebSocket 连接创建成功后会调用 _joinRoom() 方法,获取服务器的能力集。

async join()
{
...

this._protoo.on('open', () => this._joinRoom());

...
}

_joinRoom 方法中会发送 getRouterRtpCapabilities 消息并同步等待服务器响应。

async _joinRoom()
{
try
{
...

const routerRtpCapabilities =
await this._protoo.request('getRouterRtpCapabilities');
...
}
...
}

服务器收到请求,直接返回在 createRouter 阶段生成 rtpCapabilities。

async _handleProtooRequest(peer, request, accept, reject)
{
switch (request.method)
{
case 'getRouterRtpCapabilities':
{
accept(this._mediasoupRouter.rtpCapabilities);

break;
}
...
}
...
}

3.2. load

Device 是 mediasoup 客户端 SDK 的主要导出类,代表一个通信设备。在获取服务器的能力集后,客户端应立即使用服务器能力集作为参数初始化 Device。

async _joinRoom()
{
...

const routerRtpCapabilities =
await this._protoo.request('getRouterRtpCapabilities');

await this._mediasoupDevice.load({ routerRtpCapabilities });

...
}

Device 会基于本地和服务器能力集,计算本端媒体接收能力集和 SCTP 能力集。在计算本端接收能力集时,首先匹配 primary codec,再匹配远端支持的 rtx codec。客户端媒体接收能力集,rtx codec由服务器决定。

async load({routerRtpCapabilities,}: {routerRtpCapabilities: RtpCapabilities;})
: Promise<void> {
...

// 拷贝本地和服务器能力集
const clonedRouterRtpCapabilities = utils.clone<RtpCapabilities>(
routerRtpCapabilities
);
const nativeRtpCapabilities = await handler.getNativeRtpCapabilities();
const clonedNativeRtpCapabilities = utils.clone<RtpCapabilities>(
nativeRtpCapabilities
);

// 对两个能力集取交集
this._extendedRtpCapabilities = ortc.getExtendedRtpCapabilities(
clonedNativeRtpCapabilities,
clonedRouterRtpCapabilities
);

// 生成媒体接收能力集
this._recvRtpCapabilities = ortc.getRecvRtpCapabilities(
this._extendedRtpCapabilities
);

// 生成SCTP能力集
this._sctpCapabilities = await handler.getNativeSctpCapabilities();

...
}

3.3. createWebRtcTransport

客户端可以通过在 URL 中添加“produce=false”来指示本端不发送媒体,如下所示:

https://192.168.28.164:3000/?roomId=6i7vwgur&produce=false

如果没有设置“produce=false”,则会请求服务器创建一个用来发送媒体的 WebRtcTransport。

还可以在 URL 中添加“forceTcp=true”来强制使用 TCP 来传输媒体,如下所示:

https://192.168.28.164:3000/?roomId=6i7vwgur&forceTcp=true

async _joinRoom()
{
...
if (this._produce)
{
// 请求创建WebRtcTransport
const transportInfo = await this._protoo.request(
'createWebRtcTransport',
{
forceTcp         : this._forceTcp, // 是否使用TCP传输媒体
producing        : true,           // 支持发送媒体
consuming        : false,          // 不支持接收媒体
sctpCapabilities : this._useDataChannel
? this._mediasoupDevice.sctpCapabilities
: undefined
});
...
}
...
}

服务器收到 createWebRtcTransport 消息处理逻辑如下:

async _handleProtooRequest(peer, request, accept, reject)
{
switch (request.method)
{
...
case 'createWebRtcTransport':
{
// 构造createWebRtcTransport参数

const {
forceTcp,
producing,
consuming,
sctpCapabilities
} = request.data;

const webRtcTransportOptions =
{
// WebRtcServer配置参数(监听地址和端口等信息)
...utils.clone(config.mediasoup.webRtcTransportOptions),
// 基于WebRtcServer创建Transport
webRtcServer      : this._webRtcServer,
// ICE应答超时时间???
iceConsentTimeout : 20,
enableSctp        : Boolean(sctpCapabilities),
numSctpStreams    : (sctpCapabilities || {}).numStreams,
appData           : { producing, consuming }
};

// 如果强制使用TCP传输媒体,则过滤非TCP地址
if (forceTcp)
{
webRtcTransportOptions.listenInfos = webRtcTransportOptions.listenInfos
.filter((listenInfo) => listenInfo.protocol === 'tcp');

webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}

// 调用Route接口创建Transport
const transport =
await this._mediasoupRouter.createWebRtcTransport(webRtcTransportOptions);

...

// Store the WebRtcTransport into the protoo Peer data Object.
peer.data.transports.set(transport.id, transport);

// 返回给客户端的信息
accept(
{
id             : transport.id,
iceParameters  : transport.iceParameters,
iceCandidates  : transport.iceCandidates,
dtlsParameters : transport.dtlsParameters,
sctpParameters : transport.sctpParameters
});

...

break;
}

在 createWebRtcTransport 方法中会生成 transportId,然后向 Worker 进程发送管道消息,如果 transport 是建立在 webRtcServer 之上,则发送 ROUTER_CREATE_WEBRTCTRANSPORT_WITH_SERVER 消息,否则发送 ROUTER_CREATE_WEBRTCTRANSPORT 消息。

async createWebRtcTransport<
WebRtcTransportAppData extends AppData = AppData,
>({
webRtcServer,
listenInfos,
listenIps,
port,
enableUdp,
enableTcp,
preferUdp = false,
preferTcp = false,
initialAvailableOutgoingBitrate = 600000,
enableSctp = false,
numSctpStreams = { OS: 1024, MIS: 1024 },
maxSctpMessageSize = 262144,
sctpSendBufferSize = 262144,
iceConsentTimeout = 30,
appData,
}: WebRtcTransportOptions<WebRtcTransportAppData>): Promise<
WebRtcTransport<WebRtcTransportAppData>
> {
...

// 生成UUID作为transportId
const transportId = generateUUIDv4();

...

// 向Worker进程发送pipe消息
const response = await this.#channel.request(
webRtcServer
? FbsRequest.Method.ROUTER_CREATE_WEBRTCTRANSPORT_WITH_SERVER
: FbsRequest.Method.ROUTER_CREATE_WEBRTCTRANSPORT,
FbsRequest.Body.Router_CreateWebRtcTransportRequest,
requestOffset,
this.#internal.routerId
);

...
}

3.4. createSendTransport

客户端收到 createWebRtcTransport 响应后,会调用 Device 接口创建 SendTransport,与服务器上创建的WebRtcTransport 遥相呼应。音视频复用同一个SendTransport。

async _joinRoom()
{
...

// 从服务器获取的一系列参数,用来创建本地Transport
const {
id,
iceParameters,
iceCandidates,
dtlsParameters,
sctpParameters
} = transportInfo;

// 本地创建SendTransport
this._sendTransport = this._mediasoupDevice.createSendTransport(
{
id,
iceParameters,
iceCandidates,
dtlsParameters :
{
...dtlsParameters,
role : 'auto'
},
sctpParameters,
iceServers             : [],
proprietaryConstraints : PC_PROPRIETARY_CONSTRAINTS,
additionalSettings    :
{ encodedInsertableStreams: this._e2eKey && e2e.isSupported() }
});
...
}

createSendTransport 最终会调用 Chrome111::run(以 Chrome111 作为 HandlerInterface 实例进行说明,后同),内部会创建 PeerConnection 对象。

run({
direction,
iceParameters,
iceCandidates,
dtlsParameters,
sctpParameters,
iceServers,
iceTransportPolicy,
additionalSettings,
proprietaryConstraints,
extendedRtpCapabilities,
}: HandlerRunOptions): void {
logger.debug('run()');

// "send" or "recv"
this._direction = direction;

// 生成远端SDP,暂时没有媒体描述
this._remoteSdp = new RemoteSdp({
iceParameters,
iceCandidates,
dtlsParameters,
sctpParameters,
});

// extendedRtpCapabilities是调用Device.load方法时生成的
// 可以认为协商后的能力集

// 本端能力集
this._sendingRtpParametersByKind = {
audio: ortc.getSendingRtpParameters('audio', extendedRtpCapabilities),
video: ortc.getSendingRtpParameters('video', extendedRtpCapabilities),
};

// 远端能力集
this._sendingRemoteRtpParametersByKind = {
audio: ortc.getSendingRemoteRtpParameters(
'audio',
extendedRtpCapabilities
),
video: ortc.getSendingRemoteRtpParameters(
'video',
extendedRtpCapabilities
),
};

if (dtlsParameters.role && dtlsParameters.role !== 'auto') {
this._forcedLocalDtlsRole =
dtlsParameters.role === 'server' ? 'client' : 'server';
}

// 创建PeerConnection
this._pc = new (RTCPeerConnection as any)(
{
iceServers: iceServers || [],
iceTransportPolicy: iceTransportPolicy || 'all',
bundlePolicy: 'max-bundle',
rtcpMuxPolicy: 'require',
sdpSemantics: 'unified-plan',
...additionalSettings,
},
proprietaryConstraints
);

...
}

3.5. createWebRtcTransport

客户端端可以通过在 URL 中添加“consume=false”来指示本端不接收媒体,如下所示:

https://192.168.28.164:3000/?roomId=6i7vwgur&consume=false

如果没有设置“consume=false”,则会请求服务器创建另一个用来接收媒体的 WebRtcTransport。

async _joinRoom()
{
...

if (this._consume)
{
// 请求创建WebRtcTransport
const transportInfo = await this._protoo.request(
'createWebRtcTransport',
{
forceTcp         : this._forceTcp, // 是否使用TCP传输媒体
producing        : false,          // 不支持发送媒体
consuming        : true,           // 支持接收媒体
sctpCapabilities : this._useDataChannel
? this._mediasoupDevice.sctpCapabilities
: undefined
});
...
}
...
}

服务器收到createWebRtcTransport请求后,处理逻辑与1.2.3节一致,不再赘述。

3.6. createRecvTransport

客户端收到服务器响应后,会调用 Device 接口创建 RecvTransport。音视频复用同一个RecvTransport。

async _joinRoom()
{
...

// 从服务器获取的一系列参数,用来创建本地Transport
const {
id, // 服务器生成的transport ID
iceParameters,
iceCandidates,
dtlsParameters,
sctpParameters
} = transportInfo;

// 创建本地RecvTransport
this._recvTransport = this._mediasoupDevice.createRecvTransport(
{
id,
iceParameters,
iceCandidates,
dtlsParameters :
{
...dtlsParameters,
role : 'auto'
},
sctpParameters,
iceServers        : [],
additionalSettings :
{ encodedInsertableStreams: this._e2eKey && e2e.isSupported() }
});

...
}

createRecvTransport 最终会调用 Chrome111::run 方法,创建 PeerConnection 对象,相关处理逻辑与 createSendTransport 基本一致,不再赘述。

4. 加入房间(Join)

准备工作完成后,就可以加入房间了。加入房间的目的,一是与服务器交换能力集,这样客户端和服务器才能够完成能力协商;二是交换 peer 信息,这样才能够知道其他 peer 的存在并从其他 peer 接收媒体。加入房间时,如果本地要接收媒体,则需要携带本地能力集(房间中可能有其他 peer 正在发送媒体,节省一次协议交互)。

async _joinRoom()
{
...
const { peers } = await this._protoo.request(
'join',
{
displayName     : this._displayName,
device          : this._device,
rtpCapabilities : this._consume
? this._mediasoupDevice.rtpCapabilities
: undefined,
sctpCapabilities : this._useDataChannel && this._consume
? this._mediasoupDevice.sctpCapabilities
: undefined
});
...
}

服务器会返回房间中其他 peer,如果房间中其他用户正在发送媒体,要通知当前加入房间的 peer 接收媒体,接收媒体的逻辑放在后面再说,这里先按下不表。

async _handleProtooRequest(peer, request, accept, reject)
{
...
case 'join':
{
// Ensure the Peer is not already joined.
if (peer.data.joined)
throw new Error('Peer already joined');

const {
displayName,
device,
rtpCapabilities,
sctpCapabilities
} = request.data;

// Store client data into the protoo Peer data object.
peer.data.joined = true;
peer.data.displayName = displayName;
peer.data.device = device;
peer.data.rtpCapabilities = rtpCapabilities;
peer.data.sctpCapabilities = sctpCapabilities;

// Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers.

const joinedPeers =
[
...this._getJoinedPeers(),
...this._broadcasters.values()
];

// 返回除自己之外的Peer列表
const peerInfos = joinedPeers
.filter((joinedPeer) => joinedPeer.id !== peer.id)
.map((joinedPeer) => ({
id          : joinedPeer.id,
displayName : joinedPeer.data.displayName,
device      : joinedPeer.data.device
}));
accept({ peers: peerInfos });

...
}
...
}

5. Send media

加入房间后,如果本地存在音视频设备,并且策略允许发送,则立即开始发送本地的音频和视频。但发送媒体的前提是完成 SDP 协商,并在服务器创建对应的 producer。

5.1. Client::produce

加入房间成功后,如果允许发送媒体,则打开麦克风和摄像头。由于已经获取到服务器的能力集,并且创建了SendTransport,已经有充足的信息完成发送媒体的协商。

async _joinRoom()
{
...

if (this._produce)
{
...

// 打开麦克风
this.enableMic();

const devicesCookie = cookiesManager.getDevices();

// 打开摄像头
if (!devicesCookie || devicesCookie.webcamEnabled || this._externalVideo)
this.enableWebcam();

...
}
...
}

以 enableMic 为例,其内部会调用本地 SendTransport::produce 方法。

async enableMic()
{
...

this._micProducer = await this._sendTransport.produce(
{
track,
codecOptions :
{
opusStereo : true,
opusDtx    : true,
opusFec    : true,
opusNack   : true
}
});
...
}

SentTransport::produce 方法内部先调用 Chrome111::send 方法。

async produce<ProducerAppData extends AppData = AppData>({
track,
encodings,
codecOptions,
codec,
stopTracks = true,
disableTrackOnPause = true,
zeroRtpOnPause = false,
onRtpSender,
appData = {} as ProducerAppData,
}: ProducerOptions<ProducerAppData> = {}): Promise<
Producer<ProducerAppData>
> {
...

const { localId, rtpParameters, rtpSender } =
await this._handler.send({
track,
encodings: normalizedEncodings,
codecOptions,
codec,
onRtpSender,
});

...
}

Chrome111::send 方法内部调用 setupTransport 准备 transport,然后完成 PeerConnection SDP 协商。

async send({track, encodings, codecOptions, codec,}
    : HandlerSendOptions): Promise<HandlerSendResult> {
...

// 调用PeerConnection接口添加transceiver,相当于添加SDP中的media section
// 方向为sendonly
const transceiver = this._pc.addTransceiver(track, {
direction: 'sendonly',
streams: [this._sendStream],
});

// 发送媒体需要客户端创建Offer
let offer = await this._pc.createOffer();

// 同步等待设置服务器WebRtcTransport参数
if (!this._transportReady) {
await this.setupTransport({
localDtlsRole: this._forcedLocalDtlsRole ?? 'client',
localSdpObject,
});
}

...

await this._pc.setLocalDescription(offer);

...

await this._pc.setRemoteDescription(answer);

...
}

Chrome111::setupTransport 方法触发 @connect 事件,携带本端 DTLS 参数。

private async setupTransport({
localDtlsRole,
localSdpObject,
}: {
localDtlsRole: DtlsRole;
localSdpObject?: any;
}): Promise<void> {
if (!localSdpObject) {
localSdpObject = sdpTransform.parse(this._pc.localDescription.sdp);
}

// Get our local DTLS parameters.
const dtlsParameters = sdpCommonUtils.extractDtlsParameters({
sdpObject: localSdpObject,
});

// Set our DTLS role.
dtlsParameters.role = localDtlsRole;

// Update the remote DTLS role in the SDP.
this._remoteSdp!.updateDtlsRole(
localDtlsRole === 'client' ? 'server' : 'client'
);

// Need to tell the remote transport about our parameters.
// 触发@connect事件,通知服务器
await new Promise<void>((resolve, reject) => {
this.safeEmit('@connect', { dtlsParameters }, resolve, reject);
});

this._transportReady = true;
}

Transport 监听 @connect 事件进一步触发 connect 事件。

handler.on(
'@connect',
(
{ dtlsParameters }: { dtlsParameters: DtlsParameters },
callback: () => void,
errback: (error: Error) => void
) => {
...

this.safeEmit('connect', { dtlsParameters }, callback, errback);
}
);

5.2. connectWebRtcTransport

RoomClient 监听 connect 事件,向服务器发送 connectWebRtcTransport 请求,携带本端 DTLS 参数。

this._sendTransport.on(
'connect', ({ iceParameters, dtlsParameters }, callback, errback) =>
{
this._protoo.request(
'connectWebRtcTransport',
{
transportId : this._sendTransport.id,
iceParameters,
dtlsParameters
})
.then(callback)
.catch(errback);
});

服务器收到 connectWebRtcTransport 请求,调用 WebRtcTransport::connect 接口。

async _handleProtooRequest(peer, request, accept, reject)
{
switch (request.method)
{
...
case 'connectWebRtcTransport':
{
const { transportId, dtlsParameters } = request.data;
const transport = peer.data.transports.get(transportId);

if (!transport)
throw new Error(`transport with id "${transportId}" not found`);

await transport.connect({ dtlsParameters });

accept();

break;
}
...
}
...
}

WebRtcTransport::connect 方法会向 Worker 进程发送 WEBRTCTRANSPORT_CONNECT 消息,Worker 会设置 WebRTCTransport 的 DTLS 参数,并确定 DTLS 角色。

async connect({dtlsParameters,}: {dtlsParameters: DtlsParameters;}): Promise<void> {
logger.debug('connect()');

// 携带DTLS参数
const requestOffset = createConnectRequest({
builder: this.channel.bufferBuilder,
dtlsParameters,
});

// 同步发送pipe消息
const response = await this.channel.request(
FbsRequest.Method.WEBRTCTRANSPORT_CONNECT,
FbsRequest.Body.WebRtcTransport_ConnectRequest,
requestOffset,
this.internal.transportId
);

/* Decode Response. */
const data = new FbsWebRtcTransport.ConnectResponse();

response.body(data);

// 设置DTLS角色:'auto' | 'client' | 'server'
this.#data.dtlsParameters.role = dtlsRoleFromFbs(data.dtlsLocalRole());
}

5.3. Server::produce

WebRtcTransport 创建成功后,服务器回复 connectWebRtcTransport 响应,一路返回到调用 Chrome111::setupTransport 处,调用 setLocalDescription 和 setRemoteDescription 完成 PeerConnection SDP 协商,并触发“produce”事件。

async produce<ProducerAppData extends AppData = AppData>({
track,
encodings,
codecOptions,
codec,
stopTracks = true,
disableTrackOnPause = true,
zeroRtpOnPause = false,
onRtpSender,
appData = {} as ProducerAppData,
}: ProducerOptions<ProducerAppData> = {}): Promise<
Producer<ProducerAppData>
> {
...

const { localId, rtpParameters, rtpSender } =
await this._handler.send({
track,
encodings: normalizedEncodings,
codecOptions,
codec,
onRtpSender,
});

...

const { id } = await new Promise<{ id: string }>(
(resolve, reject) => {
this.safeEmit(
'produce',
{
kind: track.kind as MediaKind,
rtpParameters,
appData,
},
resolve,
reject
);
}
);

RoomClient 在创建 SendTransport 时,已经监听了“produce”事件,客户端 SDP 协商已经完成,需要向服务器发送 produce 请求,在服务器上创建 producer。

async _joinRoom()
{
...
this._sendTransport.on(
'produce', async ({ kind, rtpParameters, appData }, callback, errback) =>
{
try
{
// eslint-disable-next-line no-shadow
const { id } = await this._protoo.request(
'produce',
{
transportId : this._sendTransport.id,
kind,
rtpParameters,
appData
});

callback({ id });
}
catch (error)
{
errback(error);
}
});
...
}

服务器收到 produce 请求,调用 Transport::produce 方法。

async _handleProtooRequest(peer, request, accept, reject)
{
switch (request.method)
{
case 'produce':
{
...
const producer = await transport.produce(
{
kind,
rtpParameters,
appData
// keyFrameRequestDelay: 5000
});

// Store the Producer into the protoo Peer data Object.
peer.data.producers.set(producer.id, producer);

...

accept({ id: producer.id });

// Optimization: Create a server-side Consumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createConsumer(
{
consumerPeer : otherPeer,
producerPeer : peer,
producer
});
}

...

break;
}

Transport::produce 方法向 Worker 发送 TRANSPORT_PRODUCE 消息,Worker 创建 producer 对象。至此,媒体发送的前期工作都已完成,浏览器开始连接 Worker 服务端口开始媒体传输。

async produce<ProducerAppData extends AppData = AppData>({
id = undefined,
kind,
rtpParameters,
paused = false,
keyFrameRequestDelay,
appData,
}: ProducerOptions<ProducerAppData>): Promise<Producer<ProducerAppData>> {
logger.debug('produce()');

if (id && this.#producers.has(id)) {
throw new TypeError(`a Producer with same id "${id}" already exists`);
} else if (!['audio', 'video'].includes(kind)) {
throw new TypeError(`invalid kind "${kind}"`);
} else if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}

// Clone given RTP parameters to not modify input data.
const clonedRtpParameters = utils.clone<RtpParameters>(rtpParameters);

// This may throw.
ortc.validateRtpParameters(clonedRtpParameters);

// If missing or empty encodings, add one.
if (
!clonedRtpParameters.encodings ||
!Array.isArray(clonedRtpParameters.encodings) ||
clonedRtpParameters.encodings.length === 0
) {
clonedRtpParameters.encodings = [{}];
}

// Don't do this in PipeTransports since there we must keep CNAME value in
// each Producer.
if (this.constructor.name !== 'PipeTransport') {
// If CNAME is given and we don't have yet a CNAME for Producers in this
// Transport, take it.
if (
!this.#cnameForProducers &&
clonedRtpParameters.rtcp &&
clonedRtpParameters.rtcp.cname
) {
this.#cnameForProducers = clonedRtpParameters.rtcp.cname;
}
// Otherwise if we don't have yet a CNAME for Producers and the RTP
// parameters do not include CNAME, create a random one.
else if (!this.#cnameForProducers) {
this.#cnameForProducers = utils.generateUUIDv4().substr(0, 8);
}

// Override Producer's CNAME.
clonedRtpParameters.rtcp = clonedRtpParameters.rtcp ?? {};
clonedRtpParameters.rtcp.cname = this.#cnameForProducers;
}

const routerRtpCapabilities = this.#getRouterRtpCapabilities();

// This may throw.
const rtpMapping = ortc.getProducerRtpParametersMapping(
clonedRtpParameters,
routerRtpCapabilities
);

// This may throw.
const consumableRtpParameters = ortc.getConsumableRtpParameters(
kind,
clonedRtpParameters,
routerRtpCapabilities,
rtpMapping
);

const producerId = id || utils.generateUUIDv4();
const requestOffset = createProduceRequest({
builder: this.channel.bufferBuilder,
producerId,
kind,
rtpParameters: clonedRtpParameters,
rtpMapping,
keyFrameRequestDelay,
paused,
});

// 发送pipe消息
const response = await this.channel.request(
FbsRequest.Method.TRANSPORT_PRODUCE,
FbsRequest.Body.Transport_ProduceRequest,
requestOffset,
this.internal.transportId
);

/* Decode Response. */
const produceResponse = new FbsTransport.ProduceResponse();

response.body(produceResponse);

const status = produceResponse.unpack();

const data = {
kind,
rtpParameters: clonedRtpParameters,
type: producerTypeFromFbs(status.type),
consumableRtpParameters,
};

const producer = new Producer<ProducerAppData>({
internal: {
...this.internal,
producerId,
},
data,
channel: this.channel,
appData,
paused,
});

this.#producers.set(producer.id, producer);
producer.on('@close', () => {
this.#producers.delete(producer.id);
this.emit('@producerclose', producer);
});

// 通知transport新增了一个producer
this.emit('@newproducer', producer);

// Emit observer event.
this.#observer.safeEmit('newproducer', producer);

return producer;
}

6. Receive Media

客户端从服务器接收媒体,有两个触发场景,处理逻辑是一样的。

1)Peer 加入房间时,如果房间中已经有其他 peeer 在发送媒体,则需要通知刚加入房间的 peer 接收其他 peer 发送的媒体。

2)Peer 加入房间后,开始发送媒体,服务器需要通知其他 peer 接收此 peer 发送的媒体。

6.1. newConsumer

当需要通知客户端接收媒体时,服务器会调用 _createConsumer 方法。_createConsumer 会先调用 consume 方法在 Worker 上创建 Consumer,然后向客户端发送 newConsumer 反向请求(带响应的通知)。

async _createConsumer({ consumerPeer, producerPeer, producer })
{
...

for (let i=0; i<consumerCount; i++)
{
promises.push(
(async () =>
{
// Create the Consumer in paused mode.
let consumer;

try
{
consumer = await transport.consume(
{
producerId      : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities,
// Enable NACK for OPUS.
enableRtx       : true,
paused          : true
});
}

...

// Send a protoo request to the remote Peer with Consumer parameters.
try
{
await consumerPeer.request(
'newConsumer',
{
peerId         : producerPeer.id,
producerId     : producer.id,
id             : consumer.id,
kind           : consumer.kind,
rtpParameters  : consumer.rtpParameters,
type           : consumer.type,
appData        : producer.appData,
producerPaused : consumer.producerPaused
});

...
})()
);
}

...
}

Transport::consume 方法向 Worker 发送 TRANSPORT_CONSUME 消息在 Worker 上创建 Consumer。创建成功后,触发“newConsumer”事件。

async consume<ConsumerAppData extends AppData = AppData>({
producerId,
rtpCapabilities,
paused = false,
mid,
preferredLayers,
ignoreDtx = false,
enableRtx,
pipe = false,
appData,
}: ConsumerOptions<ConsumerAppData>): Promise<Consumer<ConsumerAppData>> {
...

  // 构造请求
const consumerId = utils.generateUUIDv4();
const requestOffset = createConsumeRequest({
builder: this.channel.bufferBuilder,
producer,
consumerId,
rtpParameters,
paused,
preferredLayers,
ignoreDtx,
pipe,
});

// 向 Worker 发送消息创建 Consumer 并同步等待
const response = await this.channel.request(
FbsRequest.Method.TRANSPORT_CONSUME,
FbsRequest.Body.Transport_ConsumeRequest,
requestOffset,
this.internal.transportId
);

...

// 创建并保存 consumer
const consumer = new Consumer<ConsumerAppData>({});
this.consumers.set(consumer.id, consumer);
consumer.on('@close', () => this.consumers.delete(consumer.id));
consumer.on('@producerclose', () => this.consumers.delete(consumer.id));

// 触发 newConsumer 事件
this.#observer.safeEmit('newconsumer', consumer);

return consumer;
}

6.2. consume

RoomClient 监听 newConsumer 事件,调用 RecvTransport 的 consume 方法。

this._protoo.on('request', async (request, accept, reject) =>
{
switch (request.method)
{
case 'newConsumer':
{
...
const {
peerId,
producerId,
id,
kind,
rtpParameters,
type,
appData,
producerPaused
} = request.data;

const consumer = await this._recvTransport.consume(
{
id,
producerId,
kind,
rtpParameters,
streamId : `${peerId}-${appData.share ? 'share' : 'mic-webcam'}`,
appData  : { ...appData, peerId } // Trick.
});
...
}
...
}
...
}

consume 方法最终会调用 chrome111::receive 方法。

async consume<ConsumerAppData extends AppData = AppData>({
id,
producerId,
kind,
rtpParameters,
streamId,
onRtpReceiver,
appData = {} as ConsumerAppData,
}: ConsumerOptions<ConsumerAppData>): Promise<Consumer<ConsumerAppData>> {
...

const consumerCreationTask = new ConsumerCreationTask({
id,
producerId,
kind,
rtpParameters: clonedRtpParameters,
streamId,
onRtpReceiver,
appData,
});

// Store the Consumer creation task.
this._pendingConsumerTasks.push(consumerCreationTask);

// 这里使用微任务进行调度
queueMicrotask(() => {
if (this._closed) {
return;
}

if (this._consumerCreationInProgress === false) {
this.createPendingConsumers<ConsumerAppData>();
}
});

return consumerCreationTask.promise as Promise<Consumer<ConsumerAppData>>;
}
private async createPendingConsumers<ConsumerAppData extends AppData,>(): Promise<void> {
...
const results = await this._handler.receive(optionsList);
...
}

由于客户端在连接服务器的时候已经拿到了服务器的能力集,可以依次执行以下动作:

1)receive 方法构造 offer 并调用 setRemoteDescription。

2)创建 answer 成功后,立即调用 connectWebRtcTransport 通知服务器准备好 transport(具体参考produce,不再赘述)。

3)同步等待服务器响应,使用创建的 answer 调用 setLocalDescription,完成 PeerConnection SDP 协商。
至此,接收通道已经准备好,接下来客户端可以连接 Worker 服务开始接受媒体。

async receive(optionsList: HandlerReceiveOptions[]): Promise<HandlerReceiveResult[]> {
...

await this._pc.setRemoteDescription(offer);

let answer = await this._pc.createAnswer();

...

if (!this._transportReady) {
await this.setupTransport({
localDtlsRole: this._forcedLocalDtlsRole ?? 'client',
localSdpObject,
});
}

await this._pc.setLocalDescription(answer);

...
}

7. 总结

如果只是完成基本的媒体通信,相比基于 SDP 的 WebRTC 协商,使用 mediasoup的 WebRTC 协商看起来更加复杂,这种感觉是正常的。这是因为 mediasoup 使用 ORTC 接口规范,ORTC 旨在简化 WebRTC 的 API,提供更加模块化和面向对象的接口,它把 WebRTC 的一整坨 API 进行了面向对象的细粒度模块划分,能够实现更灵活的控制。但这种模块化的拆分也带来了复杂性,以前的协商只需要处理 SDP 即可(包罗万象),现在要分别处理 Transport、DTLS、能力集等,再加上 mediasoup 服务器上 Worker/Producer/Consumer/Transport 等概念,如下图所示,使得 mediasoup 的交互流程理解起来更加困难。

如果你深入阅读 mediasoup client SDK 源码,会发现 SDP 与 ORTC 之间的转换逻辑更加复杂,客户端的实现其实也不简单,要真正吃透 mediaosoup client SDK 实现,需要对 WebRTC、SDP、ORTC 都有相当的理解才行。这超出了本文所讨论的范围。


原文地址:https://blog.csdn.net/zhh157/article/details/140606357

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!