Reliable-UDP
Reliable-UDP
用 UDP 来实现一个可靠传输,这个问题变相的再考虑,「为什么不用 TCP」,而是要在 UDP 上实现类似的可靠传输。「TCP 有什么缺陷」以至于需要在 UDP 上“自己造一个轮子”。
首先回顾一下可靠传输的 TCP 有哪些性质:
- 连接建立的「三次握手」和释放的「四次挥手」
- 拥塞控制
- 顺序交付
- 全双工
- 报文头部(20字节起步)
如果说基于 UDP 实现可靠传输的效果可以比 TCP 做的更好,那么一定是舍弃了 TCP 需要做到的事情。
UDP 优化空间通常来自以下取舍:
- 放弃可靠性:允许丢包(如视频流、实时语音)
- 放弃有序性:接受乱序到达(如部分游戏状态同步)
- 放弃拥塞控制:自主控制速率(如 QUIC 的定制化拥塞控制)
- 放弃连接管理:无挥手握手(如 DNS)
在设计网络通信时,如果试图容忍丢包,这样看上去非常简单,但是在实际上业务逻辑会变得非常复杂。也就是说,如果丢了某些数据包,也不需要补发、重传,让上层逻辑自己去「容错」或者「忽略」。比如在游戏场景中,有些信息是临时的,比如角色坐标、摄像头朝向、鼠标移动位置,对于这些数据「旧的不一定有用,只要有最新的就行」,这时候丢掉中间几帧没有影响。为了让丢失中间状态也不会出问题,需要每一次都完整的发送全量状态。比如角色的位置不是用「偏移量」而是绝对位置,这样做即使丢失了前几帧,收到这一帧也知道最新的位置。
而如果容忍丢包,不保证数据一定能送到,那就要在业务逻辑中写各种检查、修复、兜底,比如判断状态是否完整、是否合理、是否需要重试等机制。但很多业务并不适合只用「最新状态」来替代「完整过程」,有些状态必须顺序到达,有些中间步骤也很重要,这时候全量同步就不适用了。
如果说在设计协议的时候,明确了不允许丢包,但不强制保证顺序(包可能会乱序到达)。既然可以保证所有的包都能送达,丢失的包也就会通过某种机制重新获得,那么事实上也可以同样保证次序。比如说可以通过包里的序列号来排序处理,顺序也就不是问题了。也就是说只要加了序号和重传,就已经非常接近 TCP 的可靠有序模型了。而这样实现与 TCP 的区别在于,允许业务能提前处理后到的包。DNS 就是一个包无序的典型案例,通过一问一答的方式请求回应。
在网络不稳定的时候,一个长期连接很可能因为某个瞬间的波动或丢包变得卡顿或无效,「短连接」可以及时中断、重新连接,从而更灵活的应对网络问题。每个短连接是一个独立的请求-响应事务,不像 TCP 的长连接那样受之前连接状态影响。短连接的好处在于,失败了没有关系,直接重新发送一个新请求,不用处理复杂的 TCP 状态(如 TIME_WAIT、FIN_WAIT)。这样做就把可靠性从传输层转移到了应用层,这样做的优势在于更灵活,但同时也需要自己去处理重试、幂等问题。
相较与 TCP 的三次握手和四次挥手,在传输小量数据时开销显得过于大,比如只是发送一个 500B 的 JSON 消息,却要 TCP 拿 7 个包起步,这种开销对于小消息是很「浪费」的。
Golang 实现简易 UDP 服务器
UDP 通信
用 Golang 先编写一个简单的 UDP 服务器,用于接受来自客户端的数据:
|
启动之后,用 nc
做一个测试:
|
把它封装成一个结构体,以便后续添加功能更加方便:
|
包结构设计
目前服务器的发送仅有数据部分,包的概念在可靠传输中比较关键,在传输过程中添加一些必要的信息,比如「确认号」、「序列号」等。
先设计一个简单的数据包结构,包含以下字段:
- 包类型:标识包的类型,比如
PING
、EOF
、ACK
等; - 序列号:用来标识数据的顺序;
- 确认号:接收方的响应;
- 载荷:实际传输的数据;
初步定义一下包的结构:
|
为服务端添加收发 Packet
的功能逻辑:
|
修改一下 OnData
回调函数,现在服务端处理的是以 Packet
为单位的数据,而不是字节流。
|
编写一个测试案例,不能使用 nc 来继续做测试了,因为使用了
gob
做二进制的编码。
|
优化 UDP Server 结构
目前 UDP Server
需要监听连接,同时需要处理读写请求。现在把这部分的逻辑分离,分为
Listener
与 Conn
。Listener
负责监听管理连接,Conn
代表具体某一个连接,Listener
不负责具体的可靠传输的逻辑。
大致实现 Listener
和 Conn
,目前
Conn
还没有实现任何关于可靠传输的内容,收发的还是原始的字节流,这部分的逻辑交给专门的类来处理。Conn
还没有实现具体的发送逻辑,只是抽象出了一个 Write
方法调用,具体来说将数据传递给一个 channel,那么需要有 channel
的数据接收者。
const (
MAX_PACKAGE = 0x7fff
)
// 负责对连接的监听与管理
type LtcpListener struct {
conn *net.UDPConn
lock sync.RWMutex
newLtcpConnChan chan *LtcpConn
newLtcpErr chan error
ltcpConnMap map[string]*LtcpConn
}
func NewLtcpListener(conn *net.UDPConn) *LtcpListener {
// TODO: 提供 option 选项
listen := &LtcpListener{
conn: conn,
newLtcpConnChan: make(chan *LtcpConn, 1024),
newLtcpErr: make(chan error, 12),
ltcpConnMap: make(map[string]*LtcpConn),
}
return listen
}
func (l *LtcpListener) Accept() (net.Conn, error) {
select {
case c := <-l.newLtcpConnChan:
return c, nil
case e := <-l.newLtcpErr:
return nil, e
}
}
func (l *LtcpListener) Close() error {
// 先断开所有维护的连接
l.closeAllUdpConn()
l.ltcpConnMap = make(map[string]*LtcpConn)
return l.conn.Close()
}
func (l *LtcpListener) CloseLtcp(addr string) {
l.lock.Lock()
defer l.lock.Unlock()
delete(l.ltcpConnMap, addr)
}
func (l *LtcpListener) Addr() net.Addr {
return l.conn.LocalAddr()
}
func (l *LtcpListener) closeAllUdpConn() {
l.lock.Lock()
defer l.lock.Unlock()
for name, conn := range l.ltcpConnMap {
if err := conn.Close(); err != nil {
fmt.Printf("close conn %s error: %s\n", name, err)
}
}
}
func (l *LtcpListener) Run() {
data := make([]byte, MAX_PACKAGE)
for {
n, remoteAddr, err := l.conn.ReadFromUDP(data)
if err != nil {
// TODO: 处理错误
l.newLtcpErr <- err
continue
}
l.lock.RLock()
ltcpConn, ok := l.ltcpConnMap[remoteAddr.String()]
l.lock.RUnlock()
if !ok {
ltcpConn = NewLtcpConn(l.conn, remoteAddr, l.CloseLtcp)
l.lock.Lock()
l.ltcpConnMap[remoteAddr.String()] = ltcpConn
l.lock.Unlock()
l.newLtcpConnChan <- ltcpConn
}
bts := make([]byte, n)
copy(bts, data[:n])
ltcpConn.in <- bts
}
}
type LtcpConn struct {
conn *net.UDPConn
remoteAddr *net.UDPAddr
closeFn func(addr string)
recvChan chan []byte
recvErr chan error
sendChan chan []byte
sendErr chan error
in chan []byte
}
func NewUnConn(conn *net.UDPConn,
remoteAddr *net.UDPAddr,
closeFn func(string)) *LtcpConn {
con := &LtcpConn{
conn: conn,
recvChan: make(chan []byte, 1<<16),
recvErr: make(chan error, 2),
sendChan: make(chan []byte, 1<<16),
sendErr: make(chan error, 2),
remoteAddr: remoteAddr,
}
return con
}
// 实现一下 net.Conn 接口
func (c *LtcpConn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *LtcpConn) RemoteAddr() net.Addr {
if c.remoteAddr != nil {
return c.remoteAddr
}
return c.conn.RemoteAddr()
}
func (c *LtcpConn) SetDeadline(t time.Time) error {
return nil
}
func (c *LtcpConn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *LtcpConn) SetWriteDeadline(t time.Time) error {
return nil
}
var _ net.Conn = (*LtcpConn)(nil)
func (c *LtcpConn) Close() error {
if c.remoteAddr != nil {
if c.closeFn != nil {
c.closeFn(c.remoteAddr.String())
}
// TODO: 发送中断链接请求
}
return nil
}
func (c *LtcpConn) send(bts []byte) error {
select {
case c.sendChan <- bts:
return nil
case err := <-c.sendErr:
return err
}
}
func (c *LtcpConn) Write(bts []byte) (n int, err error) {
if err := c.send(bts); err != nil {
return 0, err
}
return len(bts), nil
}
func (c *LtcpConn) Read(bts []byte) (n int, err error) {
select {
case data := <-c.recvChan:
copy(bts, data)
return len(data), nil
case err := <-c.recvErr:
return 0, err
}
}
Conn 发送逻辑
首先为 Conn
添加一些可选的配置项,具体每一项配置的作用详见注释。
|
如果设置了 AutoSend
那么用一个时间轮来出发数据的发送。大致逻辑如下所示
|
sendLoop
发送数据和接受数据的循环都还没有实现,只是定义在这里。接下来先定义一下
sendLoop
的大致逻辑。