- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
直播长链接架构演进
演进过程总结
- 直播立项 实现速度 复用已有代码,快速实现
- 直播公测 代码协议耦合 解耦:独立代码/协议/部署
- 直播全量 稳定性 控制消息量:礼物连击/状态更新/消息限频
- 直播运营 稳定性 链接解耦:独立H5长连接资源
展开查看详情
1 .探探直播⻓链接架构的演进 周长斌
2 .定位 架构 • 长链接服务 • 长链接服务 在直播业务 的架构 中的作用 演进 总结 • 长链接服务 • 收获 的演进过程 • 规划
3 .定位 长链接服务 在直播业务 中的作用
4 . 认证 RoomID 主播 主播和观众都是普通用户,唯一标识:UserID UserID 主播认证后绑定房间:RoomID 观看 观众进入直播间观看直播,资源独立(重新连接长链接) 观众
5 .
6 .系统消息 礼物消息 聊天消息
7 .系统消息 礼物消息 长链接 聊天消息
8 . 长链接 架构 服务的 架构
9 .1.请求IP列表 2.返回IP列表 3.请求连接 长链接 4.建立连接 5.通信
10 .1.请求IP列表 2.返回IP列表 业务数据 3.请求连接 长链接 业务 4.建立连接 5.通信
11 .1.请求IP列表 Dispatcher 2.返回IP列表 (http) 业务数据 3.请求连接 Router 业务 (grpc) 4.建立连接 Connector (tcp) 5.通信
12 .Dispatcher (http) Router (grpc) Connector (tcp)
13 .dispatcher HTTP 接入层 connector router 架构 TCP GRPC 长链接 业务 dispatcher 1. HTTP 2. HTTP Access Point Live 3.TCP Connector IP1 4. GRPC Chat Connector router IP2 Gift Connector 5. GRPC Admin IP3
14 . Dispatcher dispatcher router connector // 提供Http接⼝,返回⻓链接connector节点的IP列表 func (s DispatcherServer) Route(g *gin.Engine) { g.GET("/v2/live-metadata", s.liveMetadata) } /* * 1. 随机返回4个IP地址 * 2. 根据客户端信息返回对应的IP协议类型 */ realTcpAddrs = allTcpAddrs if len(allTcpAddrs) > 4 { realTcpAddrs, err = util.Random(allTcpAddrs, 4) if err != nil { realTcpAddrs = []string{allTcpAddrs[rand.Intn(len(allTcpAddrs))]} } }
15 . Connector dispatcher router connector 长链接数据结构 消息上行下行
16 . Connector dispatcher router connector type Client struct { client mutex sync.RWMutex 长链接数据结构 // The tcp connection. tcpConn *net.TCPConn tcpConn // The user info. UserId int UserID RoomId string RoomID roomType string } 0 … 255 // Store client by userId. var hub *Hub rid … rid // Store client by roomId. var tagMapList []TagMap type Hub struct { type Tag struct { // router is a array of 256 subRouter item users map[int]*Client router []*subRouter mutex *sync.RWMutex uid … uid uid … uid mutex *sync.RWMutex uid … uid uid … uid roomKey string } } type subRouter struct { type TagMap struct { //key is userId, value is Client point tags map[string]*Tag r map[int]*Client mutex *sync.RWMutex C C C C mutex *sync.RWMutex } C C C C }
17 . Connector dispatcher router connector conn, err := listener.AcceptTCP() if err != nil { slf.Errorw("AcceptTCP", slf.Error(err)) continue } 消息上行下行 client := NewClient(conn) go client.readPump() func (c *Client) readPump() { for { 服务启动 // Read. tcpData, err := c.dataPack.Unpack(c.tcpConn) // Handle message. Go Routine } } 等待连接 读数据 写数据 func (c *Client) write(data []byte) error { c.mutex.Lock() defer c.mutex.Unlock() 连接请求 if _, err := c.tcpConn.Write(data); err != nil { 处理消息 return err } return nil }
18 . Router dispatcher router connector service LiveRouterService { rpc RegisterRouter(RegisterRouterRequest) returns (RouterReply) {} // connector 注册 rpc UnregisterRouter(UnregisterRouterRequest) returns (RouterReply) {} //删除注册信息 rpc GetConnInfo(GetConnInfoRequest) returns (GetConnInfoReply) {} // 获取连接信息 rpc TransferUnicastMsg(TransferUnicastMsgRequest) returns(RouterReply) {} // 单播 rpc TransferMulticastMsg(TransferMulticastMsgRequest) returns(RouterReply) {} // 组播 rpc TransferBroadcastMsg(TransferBroadcastMsgRequest) returns(RouterReply) {} // ⼴播 } for _, liveConnectorClient := range liveConnectors { newCtx := tracing.PropagateContextWithServiceContext(ctx) go func(ctx context.Context, connectorCli *rpcclient.LiveConnectorClient) { defer commonutil.Recovery() connectorCli.TransferMessage(ctx, convertMsg) }(newCtx, liveConnectorClient) }
19 .dispatcher router connector
20 . • 100% GO • 精简 代码 • V1.0 三周 1人 • 4个大版本 效率 • 发现和解决问题的速度快 • W级链接数 • 带宽 600M/6G 容量
21 . 长链接服 演进 务的演进 过程
22 .直播立项 快速实现
23 .直播立项 直播公测 独立部署
24 .直播立项 直播全量 直播公测 稳定性优化
25 .直播立项 直播全量 直播公测 运营 H5长链接
26 . 直播立项:快速实现 探探长链接 V1.0 直播长链接 V1.1 代码 分支:feature/live 协议 广播/组播 RoomID 部署 以AB实验的方式部署
27 . 直播立项:快速实现 msgEventChan := make(chan MessageEvent, ChanSize) for _, client := range roomClients { go client.sendMessage(msg) func (h *Hub) runSendMessage(poolSize int) { } for i := 0; i < poolSize; i++ { go func() { for event := range h.msgEventChan { 问题:房间人数多 消息量大 msg := event.message 2000人 QPS 400 ==> 80W go routine event.client.sendMessage(msg, event.msgInfo) Go routine 泄漏 } }() } } for _, c := range clients { msgEventChan <- MessageEvent{client : c , message: msg } }
28 . 直播公测:面临的问题 业务快速发展 代码耦合 业务同时依赖v1.0 和 v1.1 需要兼容 协议耦合 改动频繁,需要兼容 业务改动需要长链接支持的时候,效率较低
29 . 直播公测:面临的问题 代码耦合 协议耦合 业务代码依赖 Live合并tantan代码 业务依赖协议 Live合并tantan proto tantan live 解决冲突 tantan 解决冲突 live v1.0 v1.1 兼容逻辑 兼容逻辑 部署 部署