目 录CONTENT

文章目录

Go网络编程-连接池

Sakura
2023-09-02 / 0 评论 / 0 点赞 / 8 阅读 / 6653 字 / 正在检测是否收录...

一: 短连接和长连接

程序开发时,连接管理通常分为长短连接:

  • 短链接,连接建立传输数据后立即关闭,下次需要传输数据再次建立连接。

  • 长连接,连接建立完毕后,利用连接多次发送数据,在发送数据的过程中,保持连接不被关闭。最后才关闭连接

1. 短连接

短连接的操作步骤:

1. 建立连接

2. 传输数据

3. 关闭连接

  • 服务端

func Server() {
	// 要监听的地址和端口
	address := ":5678"

	//一:基于某个地址监听
	listener, err := net.Listen(tcp, address)
	if err != nil {
		log.Fatalln(err)
	}
	//defer 关闭监听
	defer listener.Close()
	fmt.Println("服务器正在监听:", listener.Addr())

	// 二:接收连接请求
	// 连接请求不是一个,所以要循环接收
	for true {
		//接受请求
		connection, err1 := listener.Accept()
		if err1 != nil {
			log.Fatalln(err1)
		}
		// 调用处理每个连接的函数
		HandleShortLink(connection)
	}
}

func HandleShortLink(conn net.Conn) {
	fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())
	// 保证连接关闭
	defer conn.Close()
	wg := sync.WaitGroup{}

	wg.Add(1)
	// 发送
	go ShortLinkWrite(conn, &wg)

	wg.Wait()
}

func ShortLinkWrite(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()

	type Message struct {
		ID      uint   `json:"id,omitempty"`
		Code    string `json:"code,omitempty"`
		Content string `json:"content,omitempty"`
	}
	// 要发送的消息
	message := Message{
		ID:      10,
		Code:    "Contend-code",
		Content: "Golang网络编程",
	}

	encoder := json.NewEncoder(conn)
	// 利用编码器进行编码
	// 编码成功后会写入到conn , 已经完成了conn.Write()
	if err := encoder.Encode(message); err != nil {
		log.Println(err)
		return
	}
	fmt.Println("消息已发送,连接已被关闭")
	return
}
  • 客户端

func ClientMessage() {
	// 服务器地址
	ServerAddress := "127.0.0.1:5678"

	// 建立连接
	connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
	if err != nil {
		log.Fatalln("客户端连接失败")
		return
	}
	//保证关闭
	defer connection.Close()
	fmt.Println("连接被接受,服务器地址为", connection.LocalAddr())
	wg := sync.WaitGroup{}

	// 并发读取
	wg.Add(1)
	go ClientShortLinkRead(connection, &wg)

	wg.Wait()
}

func ClientShortLinkRead(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()

	type Message struct {
		ID      uint   `json:"id,omitempty"`
		Code    string `json:"code,omitempty"`
		Content string `json:"content,omitempty"`
	}
	// 解码后的消息
	message := Message{}

	for {
		decoder := json.NewDecoder(conn)

		DecoderErr := decoder.Decode(&message)

		// 错误为 io.EOF 时,表示连接被给关闭
		// 服务端关闭,客户端会读到EOF
		if DecoderErr != nil && errors.Is(DecoderErr, io.EOF) {
			log.Println(DecoderErr)
			fmt.Println("连接关闭")
			break
		}
		fmt.Println(message)
	}
}

2. 长连接

在使用长连接时,通常需要使用规律性的发送数据包,以维持在线状态,称为心跳检测。一旦心跳检测不能正确响应,那么就意味着对方(或者己方)不在线,关闭连接。心跳检测用来解决半连接问题。

发送心跳检测的发送端:

  • 客户端

  • 服务端

  • 甚至两端都发

典型的有两种发送策略:

  • 建立连接后,就使用固定的频率发送

  • 一段时间没有接收到数据后,发送检测包。 ( TCP 层的KeepAlive就是该策略 )

心跳检测包的数据内容:

  • 可以无数据

  • 可以携带数据,例如做时钟同步,业务状态同步

  • 典型的 ping pong 结构

心跳检测包是否需要响应:

  • 可以不响应,发送成功即可

  • 可以响应,通常用于同步数据

pingpong 模式,在连接建立后持续心跳:

  • 定时心跳

  • 判断是否接收到正确心跳响应

  • 当 N 次心跳检测失败后,断开连接

  • Server 端,发送 ping 包

  • Client 端,接收到 ping 后,响应 pong

  • Server 端,要检测是否收到了正确的响应 pong , 进而判断是否要主动断开连接

二: 连接池

连接池基本操作

  • 客户端(连接发起端), 通过连接池获取连接,Get操作

  • 当暂时使用完毕后,将连接归还连接池, Put操作

  • 其他客户端,需要连接同样去池中获取, Get操作, 只要连接没有被其他客户端占用,就可以重复使用

  • 少量长链接, 维护大量客户端的目的. 否则,每个客户端,就需要1个连接.

典型的,数据库连接池, 消息队列连接池等.

连接池的必要功能:

  • New, 初始化连接池

  • Get,获取连接

  • Put, 放回连接

连接池接口如下:

type Pool interface {
    // 获取连接
    Get() (net.Conn, error)
    // 放回连接
    Put(conn net.Conn) error
    // 释放池(全部连接)
    Release() error
    // 有效连接个数
    Len() int
}

针对 TCP 连接的生产工厂

type ConnFactory interface {
    // 构造连接
    Factory(addr string) (net.Conn, error)
    // 关闭连接的方法
    Close(net.Conn) error
    // 检查连接是否有效的方法
    Ping(net.Conn) error
}

连接池的典型配置

  1. 初始连接数, 池初始化时的连接数

  2. 最大连接数, 池中最多支持多少连接

  3. 最大空闲连接数, 池中最多有多少可用的连接

  4. 空闲连接超时时间, 多久后空闲连接会被释放

type PoolConfig struct {
    //初始连接数, 池初始化时的连接数
    InitConnNum int
    //最大连接数, 池中最多支持多少连接
    MaxConnNum int
    //最大空闲连接数, 池中最多有多少可用的连接
    MaxIdleNum int
    //空闲连接超时时间, 多久后空闲连接会被释放
    IdleTimeout time.Duration

    // 连接地址
    addr string

    // 连接工厂
    Factory ConnFactory
}

空闲连接管理

// 空闲连接结构
type IdleConn struct {
    // 连接本身
    conn net.Conn
    // 放入池子的时间,用于判断是否空间超时
    putTime time.Time
}

有了基本操作和典型配置后, 连接池的结构设计如下:

  • 要实现TcpPool接口

  • 可以找到Factory

  • 记录当前池信息,例如当前正在使用的连接数量, 空闲的连接队列等

type TcpPool struct {
    // 相关配置
    config *PoolConfig

    // 开放使用的连接数量
    openingConnNum int

    // 空闲的连接队列
    idleList chan *IdleConn

    // 并发安全锁
    mu sync.RWMutex
}

0

评论区