Reliable-UDP

Reliable-UDP

用 UDP 来实现一个可靠传输,这个问题变相的再考虑,「为什么不用 TCP」,而是要在 UDP 上实现类似的可靠传输。「TCP 有什么缺陷」以至于需要在 UDP 上“自己造一个轮子”。

首先回顾一下可靠传输的 TCP 有哪些性质:

  • 连接建立的「三次握手」和释放的「四次挥手」
  • 拥塞控制
  • 顺序交付
  • 全双工
  • 报文头部(20字节起步)

如果说基于 UDP 实现可靠传输的效果可以比 TCP 做的更好,那么一定是舍弃了 TCP 需要做到的事情。

UDP 优化空间通常来自以下取舍:

  • 放弃可靠性:允许丢包(如视频流、实时语音)
  • 放弃有序性:接受乱序到达(如部分游戏状态同步)
  • 放弃拥塞控制:自主控制速率(如 QUIC 的定制化拥塞控制)
  • 放弃连接管理:无挥手握手(如 DNS)

在设计网络通信时,如果试图容忍丢包,这样看上去非常简单,但是在实际上业务逻辑会变得非常复杂。也就是说,如果丢了某些数据包,也不需要补发、重传,让上层逻辑自己去「容错」或者「忽略」。比如在游戏场景中,有些信息是临时的,比如角色坐标、摄像头朝向、鼠标移动位置,对于这些数据「旧的不一定有用,只要有最新的就行」,这时候丢掉中间几帧没有影响。为了让丢失中间状态也不会出问题,需要每一次都完整的发送全量状态。比如角色的位置不是用「偏移量」而是绝对位置,这样做即使丢失了前几帧,收到这一帧也知道最新的位置。

而如果容忍丢包,不保证数据一定能送到,那就要在业务逻辑中写各种检查、修复、兜底,比如判断状态是否完整、是否合理、是否需要重试等机制。但很多业务并不适合只用「最新状态」来替代「完整过程」,有些状态必须顺序到达,有些中间步骤也很重要,这时候全量同步就不适用了。

如果说在设计协议的时候,明确了不允许丢包,但不强制保证顺序(包可能会乱序到达)。既然可以保证所有的包都能送达,丢失的包也就会通过某种机制重新获得,那么事实上也可以同样保证次序。比如说可以通过包里的序列号来排序处理,顺序也就不是问题了。也就是说只要加了序号和重传,就已经非常接近 TCP 的可靠有序模型了。而这样实现与 TCP 的区别在于,允许业务能提前处理后到的包。DNS 就是一个包无序的典型案例,通过一问一答的方式请求回应。

在网络不稳定的时候,一个长期连接很可能因为某个瞬间的波动或丢包变得卡顿或无效,「短连接」可以及时中断、重新连接,从而更灵活的应对网络问题。每个短连接是一个独立的请求-响应事务,不像 TCP 的长连接那样受之前连接状态影响。短连接的好处在于,失败了没有关系,直接重新发送一个新请求,不用处理复杂的 TCP 状态(如 TIME_WAIT、FIN_WAIT)。这样做就把可靠性从传输层转移到了应用层,这样做的优势在于更灵活,但同时也需要自己去处理重试、幂等问题。

相较与 TCP 的三次握手和四次挥手,在传输小量数据时开销显得过于大,比如只是发送一个 500B 的 JSON 消息,却要 TCP 拿 7 个包起步,这种开销对于小消息是很「浪费」的。

Golang 实现简易 UDP 服务器

UDP 通信

用 Golang 先编写一个简单的 UDP 服务器,用于接受来自客户端的数据:

func main() {
serverAddr, err := net.ResolveUDPAddr("udp", ":8080")
if err != nil {
panic(err)
}

conn, err := net.ListenUDP("udp", serverAddr)
if err != nil {
panic(err)
}
defer conn.Close()

fmt.Println("UDP Server listening on :8080")

buffer := make([]byte, 1024)
for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("read error: ", err)
continue
}
fmt.Printf("Received from %s: %s\n", clientAddr, string(buffer[:n]))
}
}

启动之后,用 nc 做一个测试:

# client, another shell 
echo "Hello" | nc -u 127.0.0.1 8080
# go run main.go
UDP Server listening on :8080
Received from 127.0.0.1:53191: Hello

把它封装成一个结构体,以便后续添加功能更加方便:

type (
OnData func(data []byte, addr *net.UDPAddr)
)

type UDPServer struct {
Addr string
Conn *net.UDPConn
OnData OnData
}

func NewUDPServer(addr string, onData OnData) *UDPServer {
return &UDPServer{
Addr: addr,
OnData: onData,
}
}

func (s *UDPServer) Start() error {
udpAddr, err := net.ResolveUDPAddr("udp", s.Addr)
if err != nil {
return err
}

conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return err
}
s.Conn = conn
fmt.Println("UDP Server started on ", s.Addr)

go s.run()

return nil
}

func (s *UDPServer) run() {
buffer := make([]byte, 4096)

for {
n, addr, err := s.Conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("read error: ", err)
continue
}
if s.OnData != nil {
s.OnData(buffer[:n], addr)
}
}
}

func (s *UDPServer) Close() {
s.Conn.Close()
}

包结构设计

目前服务器的发送仅有数据部分,包的概念在可靠传输中比较关键,在传输过程中添加一些必要的信息,比如「确认号」「序列号」等。

先设计一个简单的数据包结构,包含以下字段:

  • 包类型:标识包的类型,比如 PINGEOFACK 等;
  • 序列号:用来标识数据的顺序;
  • 确认号:接收方的响应;
  • 载荷:实际传输的数据;

初步定义一下包的结构:

type PacketType uint8

const (
PackTypeData PacketType = iota
PacketData
)

type Packet struct {
Type PacketType // 包类型
Seq uint32 // 序列号
Ack uint32 // 确认号
Payload []byte // 载荷
}

type PacketOptions struct {
}

func NewPacket(typ PacketType, data []byte, option PacketOptions) *Packet {
packet := &Packet{
Type: typ,
Payload: data,
}
// TODO: 根据 typ 封装信息
return packet
}

// 将 Packet 序列化为字节数组
// TODO: 目前使用 gob 实现序列化,后续优化实现自定义序列化方式
func (p *Packet) Serialize() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(p); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// 从字节数组反序列化为 Packet
func DeserializePacket(data []byte) (*Packet, error) {
var p Packet
buf := bytes.NewReader(data)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&p); err != nil {
return nil, err
}
// TODO: 做一些校验工作
return &p, nil
}

为服务端添加收发 Packet 的功能逻辑:

func (s *UDPServer) sendPacket(conn *net.UDPConn, addr *net.UDPAddr, packet *Packet) error {
data, err := packet.Serialize()
if err != nil {
return err
}

_, err = conn.WriteToUDP(data, addr)
return err
}

func (s *UDPServer) receivePacket(conn *net.UDPConn) (*Packet, *net.UDPAddr, error) {
buf := make([]byte, 1024)
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
return nil, nil, err
}

// 反序列化为 Packet
packet, err := DeserializePacket(buf[:n])
if err != nil {
return nil, nil, err
}

return packet, addr, nil
}

修改一下 OnData 回调函数,现在服务端处理的是以 Packet 为单位的数据,而不是字节流。

type (
OnData func(packet *Packet, addr *net.UDPAddr)
)

编写一个测试案例,不能使用 nc 来继续做测试了,因为使用了 gob 做二进制的编码。

func TestConn(t *testing.T) {
server := NewUDPServer(":8080", func(p *Packet, addr *net.UDPAddr) {
fmt.Printf("packet type: %d, packet data: %s\n", p.Type, string(p.Payload))
})

err := server.Start()
assert.NoError(t, err)

time.Sleep(1 * time.Second)

// 构造一个客户端
severAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:8080")
assert.NoError(t, err)
conn, err := net.DialUDP("udp", nil, severAddr)
assert.NoError(t, err)
assert.NotNil(t, conn)
defer conn.Close()

// 构建一个 Packet
p := NewPacket(PacketData, []byte("Hello"), DefaultOptions)
data, err := p.Serialize()
assert.NoError(t, err)
_, err = conn.Write(data)
assert.NoError(t, err)

time.Sleep(1 * time.Second)

// 读取来自服务端的消息
buf := make([]byte, 1024)
n, addr, err := conn.ReadFromUDP(buf)
assert.NoError(t, err)
p, err = DeserializePacket(buf[:n])
assert.NoError(t, err)
fmt.Printf("client receive packet from %s, packet type: %d, packet data: %s\n",
addr,
p.Type,
string(p.Payload))
}

/*
UDP Server started on :8080
packet type: 1, packet data: Hello
client receive packet from 127.0.0.1:8080, packet type: 2, packet data: Hello
*/

优化 UDP Server 结构

目前 UDP Server 需要监听连接,同时需要处理读写请求。现在把这部分的逻辑分离,分为 ListenerConnListener 负责监听管理连接,Conn 代表具体某一个连接,Listener 不负责具体的可靠传输的逻辑。

大致实现 ListenerConn,目前 Conn 还没有实现任何关于可靠传输的内容,收发的还是原始的字节流,这部分的逻辑交给专门的类来处理。Conn 还没有实现具体的发送逻辑,只是抽象出了一个 Write 方法调用,具体来说将数据传递给一个 channel,那么需要有 channel 的数据接收者。

const (
	MAX_PACKAGE = 0x7fff
)

// 负责对连接的监听与管理
type LtcpListener struct {
	conn *net.UDPConn
	lock sync.RWMutex

	newLtcpConnChan chan *LtcpConn
	newLtcpErr      chan error

	ltcpConnMap map[string]*LtcpConn
}

func NewLtcpListener(conn *net.UDPConn) *LtcpListener {
	// TODO: 提供 option 选项
	listen := &LtcpListener{
		conn:            conn,
		newLtcpConnChan: make(chan *LtcpConn, 1024),
		newLtcpErr:      make(chan error, 12),
		ltcpConnMap:     make(map[string]*LtcpConn),
	}
	return listen
}

func (l *LtcpListener) Accept() (net.Conn, error) {
	select {
	case c := <-l.newLtcpConnChan:
		return c, nil
	case e := <-l.newLtcpErr:
		return nil, e
	}
}

func (l *LtcpListener) Close() error {
	// 先断开所有维护的连接
	l.closeAllUdpConn()
	l.ltcpConnMap = make(map[string]*LtcpConn)
	return l.conn.Close()
}

func (l *LtcpListener) CloseLtcp(addr string) {
	l.lock.Lock()
	defer l.lock.Unlock()
	delete(l.ltcpConnMap, addr)
}

func (l *LtcpListener) Addr() net.Addr {
	return l.conn.LocalAddr()
}

func (l *LtcpListener) closeAllUdpConn() {
	l.lock.Lock()
	defer l.lock.Unlock()
	for name, conn := range l.ltcpConnMap {
		if err := conn.Close(); err != nil {
			fmt.Printf("close conn %s error: %s\n", name, err)
		}
	}
}

func (l *LtcpListener) Run() {
	data := make([]byte, MAX_PACKAGE)
	for {
		n, remoteAddr, err := l.conn.ReadFromUDP(data)
		if err != nil {
			// TODO: 处理错误
			l.newLtcpErr <- err
			continue
		}
		l.lock.RLock()
		ltcpConn, ok := l.ltcpConnMap[remoteAddr.String()]
		l.lock.RUnlock()

		if !ok {
			ltcpConn = NewLtcpConn(l.conn, remoteAddr, l.CloseLtcp)
            l.lock.Lock()
			l.ltcpConnMap[remoteAddr.String()] = ltcpConn
			l.lock.Unlock()
			l.newLtcpConnChan <- ltcpConn
		}
		bts := make([]byte, n)
		copy(bts, data[:n])
		ltcpConn.in <- bts
	}
}
type LtcpConn struct {
	conn       *net.UDPConn
	remoteAddr *net.UDPAddr
	closeFn    func(addr string)

	recvChan chan []byte
	recvErr  chan error

	sendChan chan []byte
	sendErr  chan error

	in chan []byte
}

func NewUnConn(conn *net.UDPConn,
	remoteAddr *net.UDPAddr,
	closeFn func(string)) *LtcpConn {
	con := &LtcpConn{
		conn:       conn,
		recvChan:   make(chan []byte, 1<<16),
		recvErr:    make(chan error, 2),
		sendChan:   make(chan []byte, 1<<16),
		sendErr:    make(chan error, 2),
		remoteAddr: remoteAddr,
	}
	return con
}

// 实现一下 net.Conn 接口
func (c *LtcpConn) LocalAddr() net.Addr {
	return c.conn.LocalAddr()
}

func (c *LtcpConn) RemoteAddr() net.Addr {
	if c.remoteAddr != nil {
		return c.remoteAddr
	}
	return c.conn.RemoteAddr()
}

func (c *LtcpConn) SetDeadline(t time.Time) error {
	return nil
}

func (c *LtcpConn) SetReadDeadline(t time.Time) error {
	return nil
}

func (c *LtcpConn) SetWriteDeadline(t time.Time) error {
	return nil
}

var _ net.Conn = (*LtcpConn)(nil)

func (c *LtcpConn) Close() error {
	if c.remoteAddr != nil {
		if c.closeFn != nil {
			c.closeFn(c.remoteAddr.String())
		}
		// TODO: 发送中断链接请求
	}
	return nil
}

func (c *LtcpConn) send(bts []byte) error {
	select {
	case c.sendChan <- bts:
		return nil
	case err := <-c.sendErr:
		return err
	}
}

func (c *LtcpConn) Write(bts []byte) (n int, err error) {
	if err := c.send(bts); err != nil {
		return 0, err
	}
	return len(bts), nil
}

func (c *LtcpConn) Read(bts []byte) (n int, err error) {
	select {
	case data := <-c.recvChan:
		copy(bts, data)
		return len(data), nil
	case err := <-c.recvErr:
		return 0, err
	}
}

Conn 发送逻辑

首先为 Conn 添加一些可选的配置项,具体每一项配置的作用详见注释。

type LtcpConnOptions struct {
// 是否启动自动发送,默认为 true
// 设置为 false 需要手动调用发送,才会出发网络传输
AutoSend bool
// 自动发送的时间间隔
SendTick time.Duration
}

var DefaultLtcpConnOptions = LtcpConnOptions{
AutoSend: true,
SendTick: time.Millisecond * 20,
}

如果设置了 AutoSend 那么用一个时间轮来出发数据的发送。大致逻辑如下所示

func (c *LtcpConn) run() {
if c.opts.AutoSend && c.opts.SendTick > 0 {
// 自动发送数据包
go func() {
ticker := time.NewTicker(c.opts.SendTick)
defer ticker.Stop()

for range ticker.C {
// 给 conn 发送一个信号,表示一个时间周期到了
// 可以发送这一批的数据了
c.sendTick <- 1
}
}()
}
go func() {
if c.Connected() {
// 和某个客户端建立的连接
c.connectedRecvLoop()
} else {
// 监听者
c.unconnectedRecvLoop()
}
}()
c.sendLoop()
}

sendLoop 发送数据和接受数据的循环都还没有实现,只是定义在这里。接下来先定义一下 sendLoop 的大致逻辑。