目录

简单聊一聊gRPC

什么是RPC

常规的客户端和服务端传协议需要创建TCP连接,具体什么协议根据具体的应用场景可以做选择

常见的协议

  • HTTP

RPC是远程过程调用,主要在服务端执行

rpc就是把

  • 上述过程封装下,使其操作更加有优化

  • 使用大家都认可的协议使其规范化

  • 做成一个框架

rpc在执行过程中就像在调用本地方法一样(底层还是TCP),rpc把一些细节给封装了一下

GRPC支持多个语言,go语言的仓库为

http://github.com/grpc/grpc-go

grpc是一个框架,具体的传输数据还是要用到一些相关的戒指

grpc传输所使用的协议为protobuf

protohuf用protoc(一个程序)来编译

protobuf的go的版本安装方法如下所示:

go get github.com/golang/protobuf/protoc-gen-go

protobuf是一个轻便高效的序列化数据结构协议,可以用于网络通信和数据存储

特点:性能高,传输快,维护方便

简单跑个Demo

服务端

创建中间文件

syntax = "proto3";
package services;

message ProdRequest {
  int32 prod_id = 1;
}

message ProdResponse {
  int32 prod_stock = 1;
}

创建如下所示的项目路径

https://cdn.cjpa.top/cdnimages/image-20210103015635075.png

然后在pbfiles目录下执行命令:

protoc --go_out=../ Prod.proto

生成了如下所示的文件

https://cdn.cjpa.top/cdnimages/image-20210103015905780.png

在prod.proto里面添加方法

syntax = "proto3";
package services;

message ProdRequest {
  int32 prod_id = 1;
}

message ProdResponse {
  int32 prod_stock = 1;
}

service ProdService {
  rpc GetProdStock (ProdRequest) returns (ProdResponse);
}

然后重新生成文件

protoc --go_out=plugins=grpc:../services Prod.proto

现在发现里面又多了一些东西

https://cdn.cjpa.top/cdnimages/image-20210116025654884.png

这里有一个接口,我们要在go里面去实现这个接口

https://cdn.cjpa.top/cdnimages/image-20210116025748541.png

新建一个ProdService文件,用来实现services接口

package services

import (
	"context"
)

import (
	"google.golang.org/grpc"
)

type ProdService struct {

}

func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest, opts ...grpc.CallOption) (*ProdResponse, error) {
	return &ProdResponse{ProdStock: 20},nil
}

下面开始写服务端:

package main

import (
	"google.golang.org/grpc"
	"net"
)

import (
	"go-grpc-learning/services"
)

func main() {
	rpcServer := grpc.NewServer()
	services.RegisterProdServiceServer(rpcServer,new(services.ProdService))
	lis, _ := net.Listen("tcp",":8084")
	rpcServer.Serve(lis)
}

这里有一个比较特殊RegisterProdServiceServer函数会提示参数不匹配,在网课中是把RegisterProdServiceServer声明的地方删掉最后一个参数

客户端

客户端的话就把服务端的Prod.pb.go文件拷贝过去就可以了

然后客户端的main函数:

package main

import (
	"context"
	"fmt"
	"log"
)

import (
	"google.golang.org/grpc"
)

import (
	"grpc-client/services"
)

func main() {
	connection, err := grpc.Dial(":8084")
	if err != nil {
		log.Fatal(err)
	}
	defer connection.Close()

	prodClient := services.NewProdServiceClient(connection)
	prodRes, err := prodClient.GetProdStock(context.Background(), &services.ProdRequest{ProdId: 12})
	if  err != nil {
		log.Fatal(err)
	}
	fmt.Println(prodRes.ProdStock)
}

这样子运行会报错:

2021/01/16 14:42:17 grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)

看提示是没有证书,正常来说grpc是需要使用证书的,然后证书的问题下一章节再说,先按照提示设置

grpc.WithInsecure()

第18行改成这样就可以了

	connection, err := grpc.Dial(":8084", grpc.WithInsecure())

自签证书

一般来说在生产环境中是不允许使用自签证书的,在买域名的时候域名服务商一般会送一个证书

这个证书可以找域名服务商申请,一般是免费

加入证书代码:服务端

	creds, err := credentials.NewServerTLSFromFile("keys/ssl.crt","keys/ssl.key")
	if err != nil {
		log.Fatal(err)
	}
	rpcServer := grpc.NewServer(grpc.Creds(creds))

https://cdn.cjpa.top/cdnimages/image-20210116154636989.png

重新运行server,现在再运行client就报错了

2021/01/16 15:47:10 rpc error: code = Unavailable desc = connection closed

client也一样的加入证书

	creds, err := credentials.NewClientTLSFromFile("keys/ssl.crt","www.cjpa.top")
	if err != nil {
		log.Fatal(err)
	}
	connection, err := grpc.Dial(":8084", grpc.WithTransportCredentials(creds))

这个时候运行就可以正常接收到数据了

https://cdn.cjpa.top/cdnimages/image-20210116155354637.png

热身

之前服务端使用的tcp进行连接,现在尝试用一个http看看

  • 为什么要使用httpserver?

    因为有时候一些其他的服务可能没有用到grpc服务,而是用的http服务,为了能够让其他没有用到grpc的client也能调用到我们写的服务,需要写处理http的接口

    代码如下:

    func main() {
    	creds, err := credentials.NewServerTLSFromFile("keys/ssl.crt","keys/ssl.key")
    	if err != nil {
          log.Fatal(err)
    	}
    	rpcServer := grpc.NewServer(grpc.Creds(creds))
    	services.RegisterProdServiceServer(rpcServer,new(services.ProdService))
    	//lis, _ := net.Listen("tcp",":8084")
    	//rpcServer.Serve(lis)
    	mux := http.NewServeMux()
    	mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
          rpcServer.ServeHTTP(writer,request)
    	})
    	httpServer := &http.Server{
          Addr : ":8084",
          Handler: mux,
    	}
    	httpServer.ListenAndServe()
    }
    

    https://cdn.cjpa.top/cdnimages/image-20210116160407471.png

用浏览器访问之后发现提示gRPC requires HTTP/2,这是因为grpc提供http服务的时候用的是http2,而普通的浏览器访问时http1.x,因此会有这个提示

解决方案:

这样就变成了用https访问

httpServer.ListenAndServeTLS("keys/ssl.crt","keys/ssl.key")

现在可以正常访问,但是还时优点小问题

https://cdn.cjpa.top/cdnimages/image-20210116161154420.png

下面检查一下看看请求过来的时候用了什么协议

	mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		fmt.Println(request.Proto)// 可以打印协议
		rpcServer.ServeHTTP(writer,request)
	})

发现输出的是http/2.0

https://cdn.cjpa.top/cdnimages/image-20210116161412141.png

总体问题不大,这个过会儿再看吧

下面用客户端访问服务端,然后把所有的信息都给打印下来

fmt.Println(request)
&{POST /services.ProdService/GetProdStock HTTP/2.0 2 0 
  map[
  Content-Type:[application/grpc] 
	Te:[trailers] 
	User-Agent:[grpc-go/1.35.0]] 0xc000090210 <nil> -1 [] false www.cjpa.top map[] map[] <nil> map[] 127.0.0.1:53297 /services.ProdService/GetProdStock 0xc000122a50 <nil> <nil> 0xc000096080
}

可以看到Context-Type里面有grpc

后面需要做的就是怎么调用这个grpc

使用自签CA,server,Client证书和双向认证

CA证书

CA证书(root certificate)是属于根证书颁发机构(CA)的公钥证书,用以验证它所签发的证书(客户端、服务端)

生成流程

openssl
genrsa -out ca.key 2048
req -new -x509 -days 3650 -key ca.key -out ca.pem

https://cdn.cjpa.top/cdnimages/image-20210116184649950.png

重新生成服务端证书

genrsa -out server.key 2048
req -new -key server.key -out server.csr
x509 -req -sha256 -CA ca.pem -CAkey ca.key -CAcreateserial -days 3650 -in server.csr -out server.pem

https://cdn.cjpa.top/cdnimages/image-20210116170307022.png

https://cdn.cjpa.top/cdnimages/image-20210116185013470.png

上面的过程中都会提示要输入common name,这里统一填写localhost,写成域名也可以

生成客户端证书

ecparam -genkey -name secp384r1 -out client.key
req -new -key client.key -out client.csr
x509 -req -sha256 -CA ca.pem -CAkey ca.key -CAcreateserial -days 3650 -in client.csr -out client.pem

然后在程序中重新覆盖server.crt和server.key

服务端覆盖之前

https://cdn.cjpa.top/cdnimages/image-20210116185355527.png

覆盖之后

https://cdn.cjpa.top/cdnimages/image-20210116185543153.png

服务端代码改造

func main() {
	//creds, err := credentials.NewServerTLSFromFile("keys/ssl.crt","keys/ssl.key")
	//if err != nil {
	//	log.Fatal(err)
	//}
	cert, _ := tls.LoadX509KeyPair("cert/server.pem","cert/server.key")
	certPool := x509.NewCertPool()
	ca, _ := ioutil.ReadFile("cert/ca.pem")
	certPool.AppendCertsFromPEM(ca)

	creds := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},
		ClientAuth: tls.RequireAndVerifyClientCert,
		ClientCAs: certPool,
	})

	rpcServer := grpc.NewServer(grpc.Creds(creds))
	services.RegisterProdServiceServer(rpcServer,new(services.ProdService))
	//lis, _ := net.Listen("tcp",":8084")
	//rpcServer.Serve(lis)
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		fmt.Println(request)
		rpcServer.ServeHTTP(writer,request)
	})
	httpServer := &http.Server{
		Addr : ":8084",
		Handler: mux,
	}
	//httpServer.ListenAndServe()
	httpServer.ListenAndServeTLS("cert/server.pem","cert/server.key")
}

改造客户端:

和server差不多,也是把证书文件拷贝过去,不过server.key不能放进去

https://cdn.cjpa.top/cdnimages/image-20210116185748615.png

func main() {

	//creds, err := credentials.NewClientTLSFromFile("keys/ssl.crt","www.cjpa.top")
	//if err != nil {
	//	log.Fatal(err)
	//}

	cert, _ := tls.LoadX509KeyPair("cert/client.pem","cert/client.key")
	certPool := x509.NewCertPool()
	ca, _ := ioutil.ReadFile("cert/ca.pem")
	certPool.AppendCertsFromPEM(ca)

	creds := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},//服务端证书
		ServerName: "localhost",//双向验证
		RootCAs: certPool,
	})

	connection, err := grpc.Dial(":8084", grpc.WithTransportCredentials(creds))
	if err != nil {
		log.Fatal(err)
	}
	defer connection.Close()

	prodClient := services.NewProdServiceClient(connection)
	prodRes, err := prodClient.GetProdStock(context.Background(), &services.ProdRequest{ProdId: 12})
	if  err != nil {
		log.Fatal(err)
	}
	fmt.Println(prodRes.ProdStock)
}

现在服务正常运行了

双向认证下rpc-gateway使用

用gateway来同时提供rpc和http接口

有一个第三方库

https://github.com/grpc-ecosystem/grpc-gateway

对于普通的http请求,在grpc基础之上加一层代理并转发,转变成protobuf访问,然后再把protobuf的响应转化为http响应,返回给客户端

安装:

go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway

然后再拓展里面找到这个文件夹

https://cdn.cjpa.top/cdnimages/image-20210116192546770.png

拷贝过来

https://cdn.cjpa.top/cdnimages/image-20210116192759563.png

https://cdn.cjpa.top/cdnimages/image-20210116194231640.png

一个是正常的的prod.pb,一个是网关

下面开始改造代码

先改造pbfiles/Prod.proto

syntax = "proto3";
package services;
import "google/api/annotations.proto";

message ProdRequest {
  int32 prod_id = 1;
}

message ProdResponse {
  int32 prod_stock = 1;
}

service ProdService {
  rpc GetProdStock (ProdRequest) returns (ProdResponse){
      option (google.api.http) = {
        get: "/v1/prod/{prod_id}"
       };
  };
}

在pbfiles里面打开终端,执行命令

# 这个会生成Prod.pb.go
protoc --go_out=plugins=grpc:../services Prod.proto
# 这会生成Prod.pd.gw.go
protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto

编写http网关

func main() {
	// 路由
	gwmux := runtime.NewServeMux()
	// 这个opt指定客户端使用的证书
	opt := []grpc.DialOption{grpc.WithTransportCredentials(helper.GetClientCreds())}
	// RegisterProdServiceHandlerFromEndpoint方法在Prod.pb.gw.go里面,这个Endpoint意思就是grpc的地址,和server.go里面的端口保持一致
	err := services.RegisterProdServiceHandlerFromEndpoint(context.Background(), gwmux, "localhost:8084",opt)
	if err != nil {
		log.Fatal(err)
	}
	httpServer := &http.Server{
		Addr: ":8085",
		Handler: gwmux,
	}
	httpServer.ListenAndServe()
}

这样操作之后,http请求可以通过8085端口来进行访问,然后再转发给本地的8084rpc服务

在浏览器中访问8085:

https://cdn.cjpa.top/cdnimages/image-20210117011302915.png

语法速学

repeated

上节课是传入一个商品id获取一个库存,现在我们来获取一堆商品的库存

还是老规矩,改造代码pbfiles/Pord.proto:

syntax = "proto3";
package services;
import "google/api/annotations.proto";

message ProdRequest {
  int32 prod_id = 1;
}

message QuerySize {
  int32 size = 1;  // 页尺寸,这个1是顺序
}

message ProdResponse {
  int32 prod_stock = 1;
}

/* 
返回一堆商品库存,使用了repeated修饰符
因为grpc是一个跨语言的框架,不能按照go的风格来写,repeated就代表这个属性可以重复
*/
message ProdResponseList {
  repeated ProdResponse prodRes = 1;
}

service ProdService {
  rpc GetProdStock (ProdRequest) returns (ProdResponse){
    option (google.api.http) = {
      get: "/v1/prod/{prod_id}"
     };
  };
  rpc getProdStocks (QuerySize) returns (ProdResponseList) {

  };
}

生成prod.pb.go文件:

protoc --go_out=plugins=grpc:../services Prod.proto

Services/ProdService.go里面也要实现一下上面定义的getProdStocks

func (this *ProdService) GetProdStocks(ctx context.Context,request *QuerySize) (*ProdResponseList, error) {
	Prods := []*ProdResponse{
		&ProdResponse{ProdStock: 28},
		&ProdResponse{ProdStock: 36},
		&ProdResponse{ProdStock: 12},
		&ProdResponse{ProdStock: 78},
	}
	return &ProdResponseList{
		ProdRes: Prods,
	}, nil
}

然后客户端调用:

	res, err := prodClient.GetProdStocks(context.Background(), &services.QuerySize{Size: 6})
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(res.ProdRes)

控制台输出如下

https://cdn.cjpa.top/cdnimages/image-20210117015802763.png

enum

上一个例子中我们实现了切片和数组

现在讲一下枚举类型

改造pbfiles/Prod.proto

enum ProdAreas {
  A = 0;
  B = 1;
  C = 2;
}

message ProdRequest {
  int32 prod_id = 1;
  ProdAreas prod_area = 2;
}

重新生成Prod.pb.go

改造服务端代码

func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest) (*ProdResponse, error) {
	var stock int32 = 0
	if request.ProdArea == ProdAreas_A {
		stock = 30
	}else if request.ProdArea == ProdAreas_B {
		stock = 40
	}else if request.ProdArea == ProdAreas_C {
		stock = 50
	}
	return &ProdResponse{ProdStock: stock},nil
}

客户端

prodRes, err := prodClient.GetProdStock(context.Background(), &services.ProdRequest{ProdId: 12, ProdArea: services.ProdAreas_A})
	if  err != nil {
		log.Fatal(err)
	}
	fmt.Println(prodRes.ProdStock)

输出结果如下

https://cdn.cjpa.top/cdnimages/image-20210117020822456.png

导入外部proto

新建一个proto文件,专门放一些实体

https://cdn.cjpa.top/cdnimages/image-20210117022046672.png

syntax = "proto3";
package services;

//商品模型
message ProdModel {
  int32 prod_id = 1;
  string prod_name = 2;
  float prod_price = 3;
}

在Prod.proto里面导入

import "Models.proto";

然后写一个rpc服务

rpc getProdInfo (ProdRequest) returns (ProdModel) {};

生成go文件

protoc --go_out=plugins=grpc:../services Prod.proto
protoc --go_out=plugins=grpc:../services Models.proto

实现getProdInfo

func (this *ProdService) GetProdInfo(ctx context.Context, request *ProdRequest) (*ProdModel, error) {
	ret := ProdModel {
		ProdId: 101,
		ProdName: "测试商品",
		ProdPrice: 20.5,
	}
	return &ret, nil
}

客户端调用

	prod, err := prodClient.GetProdInfo(context.Background(), &services.ProdRequest{ProdId: 12})
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(prod.ProdName)

结果如下

https://cdn.cjpa.top/cdnimages/image-20210117022326393.png

日期类型

创建主订单模型

import "google/protobuf/timestamp.proto";
// 主订单模型
message OrderMain {
  int32 order_id = 1;// 订单ID
  string order_no = 2;// 订单号
  int32 user_id = 3;// 购买者Id
  float order_money = 4;// 商品金额
  //这边还需要一个订单时间
  google.protobuf.Timestamp order_time = 5;
}

日期类型在go里面的使用

import "github.com/golang/protobuf/ptypes/timestamp"
t := timestamp.Timestamp{Seconds: time.Now().Unix()}

场景练习

上面的网关用的是get方法,下面用post试试

首先改造proto代码

Pbfiles/Order.proto

syntax = "proto3";
package services;
import "google/api/annotations.proto";
import "Models.proto";

message OrderRequest {
  OrderMain order_main = 1;
}

message OrderResponse {
  string status = 1;
  string message = 2;
}

service OrderService {
  rpc NewOrder(OrderRequest) returns (OrderResponse) {
    option (google.api.http) = {
      post: "/v1/orders"
      body: "order_main"
    };
  };
}

生成新的网关代码,和grpc代码

protoc --go_out=plugins=grpc:../services Prod.proto &&
protoc --go_out=plugins=grpc:../services Models.proto &&
protoc --go_out=plugins=grpc:../services Orders.proto &&
protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto &&
protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto

改造OrderService.go

func (this *OrderService) NewOrder(ctx context.Context,orderRequest *OrderRequest) (*OrderResponse, error) {
	fmt.Println(orderRequest)
	return &OrderResponse{
		Status: "ok",
		Message: "success",
	},nil
}

编写HttpServer.go(网关代码)

func main() {
	// 路由
	gwmux := runtime.NewServeMux()
	gRpcEndPoint := "localhost:8084"
	// 这个opt指定客户端使用的证书
	opt := []grpc.DialOption{grpc.WithTransportCredentials(helper.GetClientCreds())}
	// 商品服务网关
	// RegisterProdServiceHandlerFromEndpoint方法在Prod.pb.gw.go里面,这个Endpoint意思就是grpc的地址,和server.go里面的端口保持一致
	err := services.RegisterProdServiceHandlerFromEndpoint(context.Background(), gwmux, gRpcEndPoint,opt)
	if err != nil {
		log.Fatal(err)
	}

	// 订单服务网关
	err = services.RegisterOrderServiceHandlerFromEndpoint(context.Background(), gwmux, gRpcEndPoint,opt)
	if err != nil {
		log.Fatal(err)
	}
	httpServer := &http.Server{
		Addr: ":8085",
		Handler: gwmux,
	}
	httpServer.ListenAndServe()
}

postman测试跑一波

https://cdn.cjpa.top/cdnimages/image-20210117171021263.png

服务端

https://cdn.cjpa.top/cdnimages/image-20210117171103294.png

子订单模型

// 主订单模型
message OrderMain {
  int32 order_id = 1;// 订单ID
  string order_no = 2;// 订单号
  int32 user_id = 3;// 购买者Id
  float order_money = 4;// 商品金额
  // 这边还需要一个订单时间
  google.protobuf.Timestamp order_time = 5;
  // 嵌套
  repeated OrderDetail order_detail = 6;
}

// 子订单模型
message OrderDetail {
  int32 detail_id = 1;
  string order_no = 2;
  int32 prod_id = 3;
  float prod_price = 4;
  int32  prod_num = 5;
}

使用第三方库进行验证

https://github.com/envoyproxy/protoc-gen-validate

这个有点鸡肋,但是也挺方便还挺好用

流模式

为什么要使用流模式?

上面的例子传输的数据比较小

基本模式是客户端请求–服务端响应

这种模式有两个问题:

  • 如果传输数据比较大的时候会导致压力陡增
  • 服务端需要等待客户端的包全部发送才能处理以及响应

这样很慢,grpc只吃了流模式

举个例子

假设要从库里去取一批(x万到几十万),批量查询用户的积分

用户模型如下:

message UserInfo{
	int32 user_id = 1;
	int32 user_score = 2;
}

我们心里期望的模式是,每次先处理1w个,然后一万一万的处理,直到结束

服务端流

服务端流就是服务端分批发送

User.proto代码改造

rpc GetUserScoreByServerStream (UserScoreRequest) returns (stream UserScoreResponse);

服务端UserService.go代码改造

//服务端流模式
func (this *UserService) GetUserScoreByServerStream(in *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error {
	var score int32 = 101
	users := make([]*UserInfo, 0)
	for index, user := range in.Users {
		user.UserScore = score
		score ++
		users = append(users, user)
		// 每隔两条发送一次
		if (index + 1) % 2 ==0 && index > 0{
			err := stream.Send(&UserScoreResponse{Users: users})
			if err != nil {
				return err
			}
			users = (users)[0:0]
		}
		// 发送完之后就晴空
		time.Sleep(1 * time.Second)
	}
	if len(users) > 0 {
		err := stream.Send(&UserScoreResponse{ Users:users})
		if err != nil {
			return err
		}
	}
	return nil
}

客户端扫描读取:

stream, err := userClient.GetUserScoreByServerStream(context.Background(), &req)
	if err != nil {
		log.Fatal(err)
	}
	for {
		res,err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(res.Users)
	}

客户端流

客户端分批接收

场景

客户端批量查询用户积分

  • 客户端一次性把用户列表发送过去(客户端获取列表比较慢)
  • 服务端查询比较快

这个时候可以使用客户端流模式,服务端就一直扫描消息,然后扫描到了就发送给客户端

改装User.proto

// 客户端流
rpc GetUserByClientStream (stream UserScoreRequest) returns (UserScoreResponse) {};

UserService.go

// 客户端流
func (this *UserService) GetUserByClientStream(stream UserService_GetUserByClientStreamServer) error {
	var score int32 = 101
	users := make([]*UserInfo, 0)
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&UserScoreResponse{ Users:users}) //关闭流
		}
		if err != nil {
			return err
		}
		for _, user := range req.Users {
			user.UserScore = score // 业务处理
			score++
			users = append(users, user)
		}
	}

	return nil
}

改造客户端代码

	// 客户端流发送
	stream, err := userClient.GetUserByClientStream(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	for j := 1; j < 4; j ++{
		var req = services.UserScoreRequest{}
		req.Users = make([]*services.UserInfo,0)
		for i = 1; i < 6; i++ {	// 加了六条消息,假设是一个耗时的过程
			req.Users = append(req.Users, &services.UserInfo{UserId: i})
		}
		err := stream.Send(&req)
		if err != nil {
			log.Println(err)
		}
	}
	// 在结束之后发送关闭信号,并且接收消息
	res,_ := stream.CloseAndRecv()
	fmt.Println(res.Users)
}

双向流

https://cdn.cjpa.top/cdnimages/image-20210117193056258.png

场景:

客户端分批查询用户积分

  • 客户端分批把用户列表发送过去(客户端获取列表比较慢)
  • 服务端查询积分也很慢,所以分批发送过去

此时两边都很慢,那么就使用双向流模式

改造代码user.proto

  // 双向流
  rpc GetUserScoreByTWS (stream UserScoreRequest) returns (stream UserScoreResponse) {};

UserService.go

//双向流
func (this *UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error{
	var score int32 = 101
	users := make([]*UserInfo, 0)
	for {
		//接收
		req, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		for _, user := range req.Users {
			user.UserScore = score // 业务处理
			score++
			users = append(users, user)
		}
		//发送
		err = stream.Send(&UserScoreResponse{Users: users})
		if err != nil {
			log.Fatal(err)
		}
		users = (users)[0:0]
	}
	return nil
}

客户端

	// 客户端流发送
	stream, err := userClient.GetUserScoreByTWS(context.Background())
	//stream, err := userClient.GetUserByClientStream(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	for j := 1; j < 4; j ++{
		var req = services.UserScoreRequest{}
		req.Users = make([]*services.UserInfo,0)
		for i = 1; i < 6; i++ {	// 加了六条消息,假设是一个耗时的过程
			req.Users = append(req.Users, &services.UserInfo{UserId: i})
		}
		err := stream.Send(&req)
		if err != nil {
			log.Println(err)
		}
		// 在结束之后发送关闭信号,并且接收消息
		res,err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(res.Users)
	}

可以看到,每次读去了5个

https://cdn.cjpa.top/cdnimages/image-20210117194535096.png