目 录CONTENT

文章目录
Go

GO网络编程-RPC

Sakura
2023-12-27 / 0 评论 / 0 点赞 / 45 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

GO网络编程-RPC

1. RPC

RPC , Remote Procedure Call,远程过程调用。与 HTTP 一致,也是应用层协议。该协议的目标是实现:调用远程过程(方法、函数)就如调用本地方法一致

  • ServiceA 需要调用 ServiceB 的 FuncOnB 函数,对于 ServiceA 来说 FuncOnB 就是远程过程

  • RPC 的目的是让 ServiceA 可以像调用 ServiceA 本地的函数一样调用远程函数 FuncOnB,也就是 ServieA 上代码层面使用:serviceB.FuncOnB() 即可完成调用

  • RPC 是 C/S 模式,调用方为 Client,远程方为 Server

  • RPC 把整体的调用过程,数据打包、网络请求等,封装完毕,在 C、S 两端的 Stub 中。Stub(代码存根)

调用流程:

  1. ServiceA 将调回需求告知 Client Sub

  2. Client Sub 将调用目标(Call ID)、参数数据(params)等调用信息进行打包(序列化),并将打包好的调用信息通过网络传输给 Server Sub

  3. Server Sub 将根据调用信息,调用相应过程。期间涉及到数据的拆包(反序列化)等操作。

  4. 远程过程 FuncOnB 运行,并得到结果,将结果告知 Server Sub

  5. Server Sub 将结果打包,并传输回给 Client Sub

  6. Client Sub 将结果拆包,把最终函数调用的结果告知 ServiceA

2. 使用 go 原生的 rpc 实现 rpc server 和 client

2.1 rpc server

  • rpc server

type HelloWorld struct {
}

func (H *HelloWorld) HelloWorld(req string, resp *string) error {
	*resp = req + "你好,Sakura"
	return nil
}

func RpcServer() {
	// 1. 创建服务
	err := rpc.RegisterName("Hello", &HelloWorld{})
	if err != nil {
		log.Println("注册RPC服务失败:", err)
		return
	}

	// 2.设置监听
	listener, err := net.Listen("tcp", "127.0.0.1:8004")
	if err != nil {
		log.Println("监听失败:", err)
	}
	log.Println("监听端口 127.0.0.1:8004")

	// 3.建立连接
	conn, err := listener.Accept()
	if err != nil {
		log.Println("Accept err:", err)
	}
	fmt.Println("连接已建立")
	// 4.绑定服务,将连接和服务进行绑定
	rpc.ServeConn(conn)
}
  • rpc client

func RpcClient() {
	// 1.用rpc链接服务器 --Dial(拨号)
	conn, err := rpc.Dial("tcp", "127.0.0.1:8004")
	if err != nil {
		log.Println("Dial err:", err)
	}
	defer conn.Close()

	// 2.调用远程函数
	var response string // 接收返回值
	// 指定调用的远程函数
	err = conn.Call("Hello.HelloWorld", "Sakura", &response)
	if err != nil {
		log.Fatalln("Call err:", err)
	}
	fmt.Println(response)
}

2.2 对 rpc 进行封装

2.gRPC

gPRC 官网 https://grpc.io/ 上的 Slogan 是:A high performance, open source universal RPC framework。就是:一个高性能、开源的通用 RPC 框架。

支持多数主流语言:C#、C++、Dart、Go、Java、Kotlin、Node、Objective-C、PHP、Python、Ruby。其中 Go 支持 Windows, Linux, Mac 上的 Go 1.13+ 版本。

gRPC 是一个 Google 开源的高性能远程过程调用 ( RPC ) 框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持有效地连接数据中心内和跨数据中心的服务。它也适用于分布式计算的最后一步,将设备、移动应用程序和浏览器与后端服务接。

3. 准备 GRPC 环境

  1. Protocol Buffer 编译器,protoc,推荐版本3

根据自己的系统选择合适的版本 , 然后解压并配置环境变量

https://github.com/protocolbuffers/protobuf/releases

# 解压到指定目录即可,要保证 protoc/bin 位于环境变量 path 中,可以随处调用
> protoc.exe --version
libprotoc 3.21.4

# 也可以使用go get安装
go get -u google.golang.org/protobuf
  1. Go Plugin , 用于 Protocol Buffer 编译器ibprotoc 3.21.4

# go语言的protobuf 插件
> go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# grpc 的 protobuf 插件
> go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 安装完毕后,要保证 $GOPATH/bin 位于环境变量 path 中

# 测试安装结果
> protoc-gen-go --version
protoc-gen-go.exe v1.32.0  (2023.12.27)
> protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0 (2023.12.27)

4. Protocl Buffer 基本使用

默认情况下,gRPC 使用 Protocol Buffers,这是 Google 用于序列化结构化数据的成熟开源机制(尽管它可以与 JSON 等其他数据格式一起使用)。

Protocol Buffers 的文档:https://developers.google.com/protocol-buffers/docs/overview

  1. 使用 protocol buffers 语法定义消息,消息是用于传递的数据

  2. 使用 protocol buffers 语法定义服务,服务是 RPC 方法的集合,来使用消息

syntax = "proto3";

// 为了生成go代码,需要增加 go_package
option go_package = "./proto-codes";

// 定义用于在服务器之间传递的消息
// 响应的产品信息结构
message ProductResponse{
  int64 id = 1;
  string name = 2;
  bool is_safe = 3;
}

// 请求产品信息时的参数消息
message ProductRequest {
  int64 id = 1;
}

// 定义服务,完成操作
service Product {
  // 定义服务的操作
  // 远程过程,服务器端的过程
  rpc ProductInfo (ProductRequest) returns (ProductResponse);

  // 其他的服务操作
}
  1. 用 Protocol Buffer 编译工具 protoc 来编译,生成对应语言的代码,例如 Go 的代码

# --go_out *.pb.go 目录
# --go-grpc_out *_grpc.pb.go 目录

protoc --go_out=. --go_opt=paths=source_relative  --go-grpc_out=. --go-grpc_opt=paths=source_relative  *.proto

5. 基于 GRPC 的服务间通信案例

  1. 首先定义 Proto

syntax = "proto3";

// 定义生成的go代码所在包
option go_package = ".;pb";

// 定义消息体
message Person {
  // 值为整数,原则上从1开始,也可以不这样。不能使用19000 - 19999
  string name = 1;
  int32  age = 2;a
}

// 定义一个gRPC服务
service StreamGreeter {
  rpc Hello(Person) returns (Person);
}

// 生成go代码和grpc代码
// protoc --go_out=./Person  --go-grpc_out=./Person Person.proto
  • Server

func GRPCSever() {
	// 1.注册服务,启动监听
	grpcserver := grpc.NewServer()
	pb.RegisterHelloServer(grpcserver, &HelloService{})

	// 2.监听
	listen, err := net.Listen("tcp", "127.0.0.1:8005")
	if err != nil {
		log.Println("监听失败,", err)
	}
	defer listen.Close()

	// 3.启动服务
	grpcserver.Serve(listen)
}

// 定义结构实现具体的业务
// 需要嵌入这个结构体
type HelloService struct {
	pb.UnimplementedHelloServer
}

// 重写未定义的方法
func (HelloService) Hello(context.Context, *pb.Person) (*pb.Person, error) {
	person := &pb.Person{
		Name: "Sakura",
		Age:  22,
	}
	return person, nil
}
  • Client

func GRPCClient() {
	// 1.连接gRPC服务
	//grpcConn, err := grpc.Dial("127.0.0.1:8004")
	// 抑制安全策略
	grpcConn, err := grpc.Dial("127.0.0.1:8005", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		fmt.Println("grpc Dial error:", err)
		return
	}
	defer grpcConn.Close()

	// 2.初始化客户端
	grpcClient := pb.NewHelloClient(grpcConn)

	// 3.调用远程服务(函数)
	reply, err := grpcClient.Hello(context.Background(), &pb.Person{Name: "李四", Age: 18})
	if err != nil {
		fmt.Println("reply error:", err)
		return
	}
	fmt.Println("reply:", reply)
}

6. 四种服务方法

// 一元RPC(Unary),客户端向服务器发送单个请求并返回单个响应,就像普通的函数调用一样
rpc SayHello(HelloRequest) returns (HelloResponse);

// 服务器流式处理(Serverstreaming)RPC,客户端向服务器发送请求并获取流以读回一系列消息
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

// 客户端流式处理(Clientstreaming)RPC,客户端写入一系列消息并将其发送到服务器,等待服务器读取并应答
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

// 双向流式处理(Bidirectionalstreaming)RPC,其中双方都使用读写流发送一系列消息。两个流独立运行
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
syntax = "proto3";            // protobuf协议版本

package echo;                 // 当前包
option go_package = ".;proto";// 编译后所在包

// EchoRequest is the request for echo.
message EchoRequest {
  string message = 1;
}

// EchoResponse is the response for echo.
message EchoResponse {
  string message = 1;
}

// Echo is the echo service.
service Echo {
  // UnaryEcho is unary echo.
  // 一元 RPC,客户端向服务器发送单个请求并返回单个响应,就像普通的函数调用一样
  rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}

  // ServerStreamingEcho is server side streaming.
  // 服务器流式处理 RPC,其中客户端向服务器发送请求并获取流以读回一系列消息。
  // 客户端从返回的流中读取,一直到没有更多消息。gRPC 保证在单个 RPC 调用中对消息进行排序
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}

  // ClientStreamingEcho is client side streaming.
  // 客户端流式处理 RPC,其中客户端写入一系列消息并将其发送到服务器。
  // 客户端完成消息写入后,将等待服务器读取它们并返回其响应。
  // 同样,gRPC 保证在单个 RPC 调用中对消息进行排序。
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}

  // BidirectionalStreamingEcho is bidirectional[ˌbaɪdəˈrekʃənl] streaming.
  // 双向流式处理 RPC,其中双方都使用读写流发送一系列消息。
  // 这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序读取和写入:
  // 例如,服务器可以在写入其响应之前等待接收所有客户端消息,或者它可以交替读取消息然后写入消息,
  // 或者读取和写入的其他一些组合。将保留每个流中消息的顺序。
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}

//protoc --go_out=. --go_opt=paths=source_relative  --go-grpc_out=. --go-grpc_opt=paths=source_relative  *.proto

6.1 一元 RPC

包含了对元数据的处理

  • Server

var port = flag.Int("port", 8006, "Grpc server port")

func GrpcServer() {
	flag.Parse()
	// 监听端口
	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Println("监听失败,", err)
	}
	server := grpc.NewServer()

	proto.RegisterEchoServer(server, &Servers{})

	server.Serve(listen)
}

type Servers struct {
	proto.UnimplementedEchoServer
}

// UnaryEcho 一元RPC服务的实现
// 元数据,map[string][string]
// token,timestamp,授权
func (s *Servers) UnaryEcho(ctx context.Context, req *proto.EchoRequest) (*proto.EchoResponse, error) {
	fmt.Println("----------UnaryEcho-----------")
	incomingContext, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		log.Println("miss metadata from context")
	}
	fmt.Println("metadata:", incomingContext)
	return &proto.EchoResponse{Message: req.Message}, nil
}
  • Client

var msg = "Client Data"

func GrpcClient() {
	conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()

	client := proto.NewEchoClient(conn)

	// 1.一元RPC
	unaryEchoWithMetadata(client, msg)
}

// 调用一元RPC方法
func unaryEchoWithMetadata(client proto.EchoClient, msg string) {
	fmt.Println("---------UnaryEcho---------")

	// Pairs封装一个metadata
	md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
	//md.Append("authorization", "token....")
	ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求

	resp, err := client.UnaryEcho(ctx, &proto.EchoRequest{Message: msg})
	if err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("Grpc Server Response:", resp)
	}
}

6.2 服务器流式处理

重点看 Server 和 Client 对于流式消息的处理,服务端向客户端发送流式消息,客户端接收消息

Server : 使用 Send 发送消息

Client:使用recv 接受消息

  • Server

var port = flag.Int("port", 8006, "Grpc server port")

func GrpcServer() {
	flag.Parse()
	// 监听端口
	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Println("监听失败,", err)
	}
	server := grpc.NewServer()

	proto.RegisterEchoServer(server, &Servers{})

	server.Serve(listen)
}

type Servers struct {
	proto.UnimplementedEchoServer
}

// 服务端流式处理RPC
func (s *Servers) ServerStreamingEcho(req *proto.EchoRequest, stream proto.Echo_ServerStreamingEchoServer) error {
	fmt.Println("----------ServerStreamingEcho-----------")

	// 服务端向客户单响应使用send函数
	for i := 0; i < 5; i++ { // 以流的形式发送多次
		err := stream.Send(&proto.EchoResponse{Message: req.Message})
		if err != nil {
			return err
		}
	}

	return nil
}
  • Client

var msg = "Client Data"

func GrpcClient() {
	conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()

	client := proto.NewEchoClient(conn)

	// 1.调用一元RPC方法
	unaryEchoWithMetadata(client, msg)

	// 2.调用服务端流式处理
	serverStreaming(client, msg)
}

// 调用服务端流式处理
func serverStreaming(client proto.EchoClient, msg string) {
	fmt.Println("---------serverStreaming Client---------")

	// Pairs封装一个metadata
	md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
	//md.Append("authorization", "token....")
	ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求

	stream, err := client.ServerStreamingEcho(ctx, &proto.EchoRequest{Message: msg})
	if err != nil {
		fmt.Println(err)
	}
	// 从流中接收数据,每次读一个消息
	for {
		// err 读取到末尾会返回 EOF
		recv, err := stream.Recv()
		if err != nil {
			log.Println(err)
			break
		} else if err == io.EOF {
			fmt.Println("finish receive")
			return
		}
		fmt.Println("response is :,", recv.Message)
	}
}

6.3 客户端流式处理

客户端向服务端发送流式消息,服务端接收消息

  • Server

func GrpcServer() {
	flag.Parse()
	// 监听端口
	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Println("监听失败,", err)
	}
	server := grpc.NewServer()

	proto.RegisterEchoServer(server, &Servers{})

	server.Serve(listen)
}

type Servers struct {
	proto.UnimplementedEchoServer
}

// ClientStreamingEcho 客户端流式处理
func (s *Servers) ClientStreamingEcho(stream proto.Echo_ClientStreamingEchoServer) error {
	fmt.Println("--------------ClientStreamingEcho------------------------")
	// 接收客户端消息
	for {
		recv, err := stream.Recv()
		if err == io.EOF {
			fmt.Println("receive finish")
			// 发送并关闭
			return stream.SendAndClose(&proto.EchoResponse{Message: "Receive Finish"})
		} else if err != nil {
			log.Println(err)
			return err
		}
		// 打印接收到的消息
		fmt.Println("request reveived: ", recv.Message)
	}
}

  • Client

func GrpcClient() {
	conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()

	client := proto.NewEchoClient(conn)

	// 1.调用一元RPC方法
	unaryEchoWithMetadata(client, msg)

	// 2.调用服务端流式处理
	serverStreaming(client, msg)

	// 3.调用客户端流式处理
	clientStreaming(client, msg)
}

// 客户端流式处理
func clientStreaming(client proto.EchoClient, msg string) {
	fmt.Println("---------clientStreaming---------")

	// Pairs封装一个metadata
	md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
	//md.Append("authorization", "token....")
	ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求

	stream, err := client.ClientStreamingEcho(ctx)
	if err != nil {
		fmt.Println(err)
	}
	// 向服务端循环发送消息
	for i := 0; i < 5; i++ {
		err = stream.Send(&proto.EchoRequest{Message: msg})
		if err != nil {
			log.Println("Fialed to send", err)
		}
	}
	// 关闭并接受
	recv, err := stream.CloseAndRecv()
	if err != nil {
		log.Println(err)
	}
	log.Println("Servere response:", recv)
}

6.4 双向流式处理

双向流式处理,客户端和服务端在接收的同时以发送

重点使用sendclose() 这个方法

  • Server

func GrpcServer() {
	flag.Parse()
	// 监听端口
	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Println("监听失败,", err)
	}
	server := grpc.NewServer()

	proto.RegisterEchoServer(server, &Servers{})

	server.Serve(listen)
}

type Servers struct {
	proto.UnimplementedEchoServer
}

// 双向流式处理
func (s *Servers) BidirectionalStreamingEcho(stream proto.Echo_BidirectionalStreamingEchoServer) error {
	fmt.Println("--------------BidirectionalStreamingEcho------------------------")
	// 每接收到客户端的消息,就向客户端发送响应
	for {
		recv, err := stream.Recv()
		if err == io.EOF {
			fmt.Println("消息发送完毕")
			stream.Send(&proto.EchoResponse{Message: "Receive over"})
			return nil
		} else if err != nil {
			log.Println("Receive Failed", err)
			return err
		}
		// 打印接收到消息
		log.Println("Client Message:", recv)
		// 回复响应
		err = stream.Send(&proto.EchoResponse{Message: "Receive success!"})
		if err != nil {
			log.Println("Send Fail,", err)
		}
	}
}

  • Client

func GrpcClient() {
	conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()

	client := proto.NewEchoClient(conn)

	// 4.双向流式处理
	serverclientStreaming(client, msg)
}

// 双向流式处理
func serverclientStreaming(client proto.EchoClient, msg string) {
	fmt.Println("---------serverclientStreaming---------")

	// Pairs封装一个metadata
	md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
	//md.Append("authorization", "token....")
	ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求

	stream, err := client.BidirectionalStreamingEcho(ctx)
	if err != nil {
		fmt.Println(err)
	}

	// 开启一个协程向服务端发送消息
	go func() {
		for i := 0; i < 5; i++ {
			err := stream.Send(&proto.EchoRequest{Message: msg})
			if err != nil {
				log.Println(err)
			}
		}
		stream.CloseSend()
	}()
	//time.Sleep(time.Second * 10)

	//循环接收服务器的消息
	for {
		recv, err1 := stream.Recv()
		if err1 == io.EOF {
			log.Println("----- Receive Finish ------ ")
			break
		} else if err != nil {
			log.Println("Receive Fail,", err)
		}
		log.Println("Server Response:", recv)
	}
}

0

评论区