golang使用gRPC

gRPC 是一个 基于 HTTP/2 协议设计的 RPC 框架,它采用了 Protobuf 作为 IDL(交互式数据语言Interactive Data Language)

一、什么是RPC

RPC 代指远程过程调用(Remote Procedure Call),它的调用包含了传输协议和编码(对象序列号)协议等等。允许运行于一台计算机的程序调用另一台计算机的子程序,而开发人员无需额外地为这个交互作用编程,可以基于 HTTP 协议

例如:A服务器上的应用如何要调用B服务器上的应用,不能直接调用,但是如果通过RPC,就十分方便,使用RPC,就跟本地调用一个函数一样简单

常见 RPC 框架比较:

名称跨语言多IDL服务治理注册中心服务管理
gRPC××××
Thrift××××
Rpcx×
Dubbo×

RPC优点:简单、通用、安全、效率,gRPC还是语言无关的。你可以用C++作为服务端,使用Golang、Java等作为客户端。为了实现这一点,我们在”定义服务“和在编码和解码的过程中,应该是做到语言无关的,如图:

从上图中可以看出,gRPC使用了Protocol Buffers

gRPC 支持定义 4 种类型的服务方法,分别是简单模式、服务端数据流模式、客户端数据流模式和双向数据流模式

  • 简单模式、一元RPC(Simple RPC、Unary RPC)是最简单的 gRPC 模式。客户端发起一次请求,服务端响应一个数据。定义格式为 rpc SayHello (HelloRequest) returns (HelloReply) {},一问一答
  • 服务端数据流模式(Server-side streaming RPC):客户端发送一个请求,服务器返回数据流响应,客户端从流中读取数据直到为空。定义格式为 rpc SayHello (HelloRequest) returns (stream HelloReply) {},一问多答
  • 客户端数据流模式(Client-side streaming RPC):客户端将消息以流的方式发送给服务器,服务器全部处理完成之后返回一次响应。定义格式为 rpc SayHello (stream HelloRequest) returns (HelloReply) {},多问一答
  • 双向数据流模式(Bidirectional streaming RPC):客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互 RPC 框架原理。定义格式为 rpc SayHello (stream HelloRequest) returns (stream HelloReply) {},多问多答

二、Protobuf

Protocol Buffers 是一种与语言、平台无关,可扩展的序列化结构化数据的方法,常用于通信协议,数据存储等等。相较于 JSON、XML,它更小、更快、更简单

可以将其当成一个代码生成工具以及序列化工具,这个工具可以把我们定义的方法,转换成特定语言的代码。比如你定义了一种类型的参数,他会帮你转换成Golang中的struct结构体,你定义的方法,他会帮你转换成func函数,此外,在发送请求和接受响应的时候,这个工具还会完成对应的编码和解码工作,将你即将发送的数据编码成gRPC能够传输的形式,又或者将即将接收到的数据解码为编程语言能够理解的数据格式。

语法如下:

//声明使用的是proto3语法
syntax = "proto3";
//package为可选,用来防止不同的消息名称冲突
package pb;
// ./pb表示生成的go文件位于pb目录下,文件的包名为pb
option go_package = "./pb;pb";
//定义服务,服务中有rpc方法用于接收客户端消息返回服务端消息
service Greeter {
  rpc Say (HelloRequest) returns (HelloReply) {}
}
//对应上面方法中的消息接收和消息回复
message HelloRequest {
  string name = 1;
  string age = 2;
}
message HelloReply {
  string hello = 1;
  string word = 2;
}

message是关键字,会将message定义的字段转换为HelloRequest和HelloReply结构体。注意,后面的1、2不是赋值,而是在定义这个变量在这个message中的位置,如果有多个就继续写1、2、3、4、5等

string表示定义的字段类型,name、age等表示字段名,实际对应的就是结构体中的字段名和类型,其中字段有三个规则:

  • required:消息体中必填字段,不设置会导致编解码异常,此关键字可以忽略
  • optional: 消息体中可选字段,通过此字段修饰后生成的是指针
  • repeated: 消息体中可重复字段,重复的值的顺序会被保留,在go中重复字段会被定义为切片
message User {
  string username = 1;
  int age = 2;
  optional string password = 3;  // 生成的是指针
  repeated string address = 4;   // 生产的是切片
}

字段类型映射关系:

proto类型说明对应python类型对应Go类型
doublefloat64float64
floatfloatfloat32
int32使用变长编码,对于负值的效率很低,如果你的域有 可能有负值,请使用sint32替代intint32
int64使用变长编码,对于负值的效率很低,如果你的域有 可能有负值,请使用sint64替代int/long[3]int64
uint32使用变长编码int/longuint32
uint64使用变长编码int/longuint64
sint32使用变长编码,这些编码在负值时比int32高效的多intint32
sint64使用变长编码,有符号的整型值。编码时比通常的 int64高效int/longint64
fixed32总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效intuint32
fixed64总是8个字节,如果数值总是比总是比256大的话,这 个类型会比uint64高效int/longuint64
sfixed32总是4个字节intint32
sfixed64总是8个字节int/longint64
boolboolbool
string一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本str/unicodestring
bytes可能包含任意顺序的字节数据str[]byte

字段默认值:

类型默认值
boolfalse
整型0
string空字符串””
枚举enum第一个枚举元素的值,Protobuf3强制要求第一个枚举元素值必须是0,所以枚举的默认值就是0
message不是null,而是DEFAULT_INSTANCE

、实战(简单模式):

实现目标:客户端发送消息给服务端,服务端收到消息后,返回响应给客户端

1、安装grpc核心库,命令如下:

go get google.golang.org/grpc   #如果安装失败需要科学上网

也可以先下载出来然后放在GOPATH/pkg目录下,Github地址:

https://github.com/grpc/grpc-go

2、安装protobuf:

wget https://github.com/google/protobuf/releases/download/v21.8/protobuf-all-21.8.zip
unzip protobuf-all-21.8.zip
cd protobuf-all-21.8
./configure
make && make install

3、执行命令查看是否安装成功:

protoc --version

如果报错:protoc: error while loading shared libraries: libprotobuf.so.15: cannot open
shared object file: No such file or directory,执行如下命令:

ldconfig       #重新加载动态库

4、安装protoc插件protoc-gen-go、protoc-gen-go-grpc,命令如下:

go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

执行完成后,在GOPATH/bin目录下可以看到响应的二进制文件,如图:

5、首先编写proto文件,

创建项目test,通过包管理器go mod来管理,执行go mod init test初始化,然后在目录下创建hello.proto文件,内容如图:

注:上图中的service Greeter会转换为hello_grpc.pb.go中的GreeterServer和GreeterClient接口,接口中包含Say()方法

利用hello.proto并且使用如下两个命令生成对应的go文件,如下:

cd /data/go/src/test
protoc --go_out=. ./hello.proto        #--go_out指定生成golagn文件
protoc --go-grpc_out=. ./hello.proto

执行后在test目录下将自动创建文件夹pb,里面生成响应的go文件,如图:

在hello.proto中定义的message都会转化为hello.pb.go中的对应名字的结构体,message中的字段也会转换为结构体中对应的字段,service中定义的rpc方法Say也会转换为hello_grpc.pb.go中接口的方法

注:其实是利用protoc-gen-go把定义的与语言无关的hello.proto转换为了go语言的代码,以便serverclient直接使用

6、编写服务端,在test下创建server.go,内容如下:

package main

import (
        "test/pb"
        "log"
        "net"

        "golang.org/x/net/context"
        "google.golang.org/grpc"
        "google.golang.org/grpc/reflection"
)

//定义服务对象来实现.proto文件中的Say方法
//新版本 gRPC 要求必须嵌入 pb.UnimplementedGreeterServer 结构体
type server struct {
        pb.UnimplementedGreeterServer
}

//实现服务的接口,Say为hello_grpc.go中的接口中的方法,实现了此方法即实现了接口
func (s *server) Say(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
        return &pb.HelloReply{Hello: "Hello+" + in.Name}, nil
}

func main() {
        //创建监听
        lis, err := net.Listen("tcp", ":8888")
        if err != nil {
                log.Fatalf("failed to listen: %v", err)
        }
        //创建一个grpc服务对象
        s := grpc.NewServer()
        //将服务对象注册到grpc的内部注册中心
        pb.RegisterGreeterServer(s, &server{})

        //注册反射服务,一般用来调试,可省略
        //reflection.Register(s)
        //启动grpc
        if err := s.Serve(lis); err != nil {
                log.Fatalf("failed to serve: %v", err)
        }
}

上面例子中演示的是返回HelloReply结构体中字段Hello的值+客户端请求的时候传递的Name字段的值,hello和name对应的是返回和请求的结构体中的字段信息,不可乱写

注:如果运行服务端提示找不到包,可使用go mod tidy来检查依赖包并下载更新即可

7、编写客户端,在test目录下创建client.go,内容如下:

package main

import (
	"test/pb"
	"context"
	"fmt"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/grpclog"
)

func main() {
	// 连接服务端
	conn, err := grpc.Dial("192.168.163.128:8888", grpc.WithInsecure())
	if err != nil {
		grpclog.Fatalln("连接错误:", err)
	}

        //延迟关闭连接
	defer conn.Close()
	// 初始化客户端
	c := pb.NewGreeterClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

        req := &pb.HelloRequest{Name: "gRPC"}
         //调用服务端Say方法
	res, err := c.Say(ctx, req)
	if err != nil {
		grpclog.Fatalln("错误:", err)
	}

	fmt.Println(res.Hello)
}

8、启动服务端和客户端,查看客户端运行结果如下:

注意:可通过go build client.go将客户端编译为二进制后放在其余机器上直接调用即可

上面通过简单方式实现rpc通信就算完成了,但是存在一个安全问题,数据都是明文传输的,有被窃听和篡改的风险,因此建议通过证书的形式进行加密

四、基于CA颁发证书形式实现数据加密传输,双向认证:

CA为根证书颁发机构,主要包含的文件为:公钥和秘钥

1、首先在test/cert目录下创建文件ca.conf,内容如下:

[ req ]
default_bits       = 4096
distinguished_name = req_distinguished_name

[ req_distinguished_name ]
countryName                 = Country Name (2 letter code)
countryName_default         = CN
stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = GuangDong
localityName                = Locality Name (eg, city)
localityName_default        = ShenZhen
organizationName            = Organization Name (eg, company)
organizationName_default    = Yunwei
commonName                  = CommonName (e.g. server FQDN or YOUR name)
commonName_max              = 64
commonName_default          = gongguan

2、生成ca秘钥,得到ca.key文件,如下:

openssl genrsa -out ca.key 4096

2、基于ca.conf生成ca证书签发请求,得到ca.csr文件,如图:

openssl req -new -sha256 -out ca.csr -key ca.key -config ca.conf

3、生成ca根证书,得到ca.pem文件,如下:

openssl x509 -req -days 3650 -in ca.csr -signkey ca.key -out ca.pem

4、在test/cert目录下新建server.conf,内容如下:

#req 总配置
[ req ]
default_bits       = 2048
distinguished_name = req_distinguished_name  #使用 req_distinguished_name配置模块
req_extensions     = req_ext  #使用 req_ext配置模块

[ req_distinguished_name ]
countryName                 = Country Name (2 letter code)
countryName_default         = CN
stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = GuangDong
localityName                = Locality Name (eg, city)
localityName_default        = ShenZhen
organizationName            = Organization Name (eg, company)
organizationName_default    = Yunwei
commonName                  = Common Name (e.g. server FQDN or YOUR name)
commonName_max              = 64
commonName_default          = gongguan    #这里的Common Name 写主要域名即可(注意:这个域名也要在alt_names的DNS.x里) 此处尤为重要,需要用该服务名字填写到客户端的代码中

[ req_ext ]
subjectAltName = @alt_names #使用 alt_names配置模块,subjectAltName缩写即为SAN

[alt_names]
DNS.1   = localhost
DNS.2   = gongguan.com
DNS.3   = gongguan.com
IP      = 127.0.0.1

5、生成秘钥,得到server.key,如下:

openssl genrsa -out server.key 2048

 6、生成证书签发请求,得到server.csr文件,如下:

openssl req -new -sha256 -out server.csr -key server.key -config server.conf

7、基于CA证书生成服务端证书,得到server.pem文件,如下:

openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -CAcreateserial -in server.csr -out server.pem -extensions req_ext -extfile server.conf

8、在test/cert目录下,新建client.conf文件,内容如下:

#req 总配置
[ req ]
default_bits       = 2048
distinguished_name = req_distinguished_name  #使用 req_distinguished_name配置模块
req_extensions     = req_ext  #使用 req_ext配置模块

[ req_distinguished_name ]
countryName                 = Country Name (2 letter code)
countryName_default         = CN
stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = GuangDong
localityName                = Locality Name (eg, city)
localityName_default        = ShenZhen
organizationName            = Organization Name (eg, company)
organizationName_default    = Yunwei
commonName                  = Common Name (e.g. server FQDN or YOUR name)
commonName_max              = 64
commonName_default          = gongguan    #这里的Common Name 写主要域名即可(注意:这个域名也要在alt_names的DNS.x里) 此处尤为重要,需要用该服务名字填写到客户端的代码中

[ req_ext ]
subjectAltName = @alt_names #使用 alt_names配置模块

[alt_names]
DNS.1   = localhost
DNS.2   = gongguan.com
DNS.3   = gongguan.com
IP      = 127.0.0.1

9、生成秘钥,得到client.key文件,如下:

openssl ecparam -genkey -name secp384r1 -out client.key

10、生成证书签发请求,得到client.csr文件,如下:

openssl req -new -sha256 -out client.csr -key client.key -config client.conf

11、用CA证书生成客户端证书,得到client.pem文件,如下:

openssl x509 -req -days 3650 -CA ca.pem -CAkey ca.key -CAcreateserial -in client.csr -out client.pem -extensions req_ext -extfile client.conf
  • key:服务器上的私钥文件,用于对发送给客户端数据的加密,以及对从客户端接收到数据的解密
  • 证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名
  • pem:是基于Base64编码的证书格式,扩展名包括PEM、CRT和CER

12、配置server(服务端)证书,如下:

package main

import (
	"Goproject/grpc/pb"
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"log"
	"net"

	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/reflection"
)

//服务对象
type server struct {
	pb.UnimplementedGreeterServer
}

//实现服务的接口,Say为hello_grpc.go中的接口中的方法,实现了此方法即实现了接口
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello+" + in.Name}, nil
}

func main() {
	//加载证书,读取和解析信息,得到证书公钥、密钥对
	cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
	if err != nil {
		log.Fatalf("tls-err")
	}
	//初始化一个CertPool
	certPool := x509.NewCertPool()
	//读取并解析pem类型的根证书
	ca, err := ioutil.ReadFile("cert/ca.pem")
	if err != nil {
		log.Fatalf("ioutil.ReadFile err: %v", err)
	}
	//解析成功后将其加入到池子中
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatalf("certPool.AppendCertsFromPEM err")
	}
	//构建基于TLS的TransportCredentials选项
	c := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},        //服务端证书链,可以有多个
		ClientAuth:   tls.RequireAndVerifyClientCert, //要求必须验证客户端证书
		ClientCAs:    certPool,                       //设置根证书集合,校验方式使用ClientAuth中设定的模式
	})
	//创建监听
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	//启动一个grpc服务,并开启TLS认证
	s := grpc.NewServer(grpc.Creds(c))
	//将服务对象注册到grpc的内部注册中心
	pb.RegisterGreeterServer(s, &server{})

	//注册反射服务
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

13、配置client(客户端)证书,如下:

package main

import (
	"Goproject/grpc/pb"
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/grpclog"
)

func main() {
	//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	cert, err := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
	if err != nil {
		log.Fatalf("tls.LoadX509KeyPair err: %v", err)
	}
	//创建一个新的、空的 CertPool
	certPool := x509.NewCertPool()
	//这里只能解析pem类型的根证书,因此需要ca.pem
	ca, err := ioutil.ReadFile("cert/ca.pem")
	if err != nil {
		log.Fatalf("ioutil.ReadFile err: %v", err)
	}
	//解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面的使用
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatalf("certPool.AppendCertsFromPEM err")
	}
	//构建基于 TLS 的 TransportCredentials 选项
	ce := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},
		ServerName:   "gongguan.com", //注意,这里的参数为配置文件中所允许的ServerName,也就是其中配置的DNS
		RootCAs:      certPool,
	})
	// 连接
	conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithTransportCredentials(ce))
	if err != nil {
		grpclog.Fatalln("连接错误:", err)
	}
	defer conn.Close()

	// 初始化客户端
	c := pb.NewGreeterClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	// 调用方法
	req := &pb.HelloRequest{Name: "gRPC"}
	res, err := c.SayHello(ctx, req)
	if err != nil {
		grpclog.Fatalln("错误:", err)
	}

	fmt.Println(res.Message)
}

14、运行服务端和客户端文件,命令如下:

go run sever.go
go run client.go

客户端执行如下,通信成功:

注:如果将客户端文件部署到其余的机器,需要将证书文件一起部署过去,否则报错,涉及文件ca.pem、client.key、client.pem

五、流式RPC

上面演示的RPC都是简单的RPC(Simple RPC),在使用 Simple RPC 时,有如下问题:

  • 数据包过大造成的瞬时压力
  • 接收数据包时,需要所有数据包都接受成功且正确后,才能够回调响应,进行业务处理(无法客户端边发送,服务端边处理)

流式RPC(Streaming RPC)适合场景:

  • 大规模数据包
  • 实时数据场景

流式RPC分为三种:

  • 服务端流模式RPC
  • 客户端流模式RPC
  • 双向流模式RPC

服务端流模式RPC

1、定义stream_hello.proto文件,注意关键字stream,如下:

syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
  //服务端流式,在返回数据前加上stream标识
  rpc ServerStream (StreamRequest) returns (stream StreamResponse) {}
}
message StreamRequest {
  string name = 1;
}
message StreamResponse {
  string message = 1;
}

上面代码定义了服务Greeter以及服务端流式函数ServerStream,返回数据通过stream关键字标识为流

2、根据定义的文件通过protoc编译工具生成go文件与grpc文件,如下:

protoc --go_out=. ./stream_hello.proto
protoc --go-grpc_out=. ./stream_hello.proto

查看stream_hello_grpc.pb.go文件,可以看到相关的流信息如下:

  • 客户端流使用Recv方法接收message,服务端流通过Send方法发送message,默认每次Recv()接收最大消息长度为`1024*1024*4`bytes(4M),单次发送消息最大长度为`math.MaxInt32`bytes,为2048M
  • 客户端接口的ServerStreamMode方法返回值是Greeter_ServerStreamModeClient(图片太大,上图中未截全),表明生成了一条流,用于接收message,如果有多个方法,那么每个方法各自生成一条流
  • 服务端接口的SerrverStreamMode方法的第一个参数为message,第二个参数为Greeter_ServerStreamModeServer(流),可以从流中发送message

3、定义服务端文件stream_server.go,注意看方法ServerStreamMode部分,如下:

package main

import (
	"Goproject/grpc/pb"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"log"
	"net"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/reflection"
)

//服务对象
type streamServer struct {
	pb.UnimplementedGreeterServer
}

//实现服务端接口,Say为hello_grpc.go中的接口中的方法,实现了此方法即实现了接口
func (s *streamServer) ServerStreamMode(p *pb.StreamRequest, st pb.Greeter_ServerStreamModeServer) error {
	fmt.Println("接收来自客户端的消息......")
	name := p.GetName()
	for i := 0; i <= 5; i++ {
		err := st.Send(&pb.StreamResponse{Message: name})
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(time.Second * 1)
	}
	fmt.Println("服务端流发送结束")
	return nil
}
func main() {
	//加载证书,读取和解析信息,得到证书公钥、密钥对
	cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
	if err != nil {
		log.Fatalf("tls-err")
	}
	//初始化一个CertPool
	certPool := x509.NewCertPool()
	//读取并解析pem类型的根证书
	ca, err := ioutil.ReadFile("cert/ca.pem")
	if err != nil {
		log.Fatalf("ioutil.ReadFile err: %v", err)
	}
	//解析成功后将其加入到池子中
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatalf("certPool.AppendCertsFromPEM err")
	}
	//构建基于TLS的TransportCredentials选项
	c := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},        //服务端证书链,可以有多个
		ClientAuth:   tls.RequireAndVerifyClientCert, //要求必须验证客户端证书
		ClientCAs:    certPool,                       //设置根证书集合,校验方式使用ClientAuth中设定的模式
	})
	//创建监听
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	//启动一个grpc服务,并开启TLS认证
	s := grpc.NewServer(grpc.Creds(c))
	//将服务对象注册到grpc的内部注册中心
	pb.RegisterGreeterServer(s, &streamServer{})

	//注册反射服务
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

4、定义客户端client.go,内容如下:

package main

import (
	"Goproject/grpc/pb"
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io"
	"io/ioutil"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/grpclog"
)

func main() {
	//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	cert, err := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
	if err != nil {
		log.Fatalf("tls.LoadX509KeyPair err: %v", err)
	}
	//创建一个新的、空的 CertPool
	certPool := x509.NewCertPool()
	//这里只能解析pem类型的根证书,因此需要ca.pem
	ca, err := ioutil.ReadFile("cert/ca.pem")
	if err != nil {
		log.Fatalf("ioutil.ReadFile err: %v", err)
	}
	//解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面的使用
	if ok := certPool.AppendCertsFromPEM(ca); !ok {
		log.Fatalf("certPool.AppendCertsFromPEM err")
	}
	//构建基于 TLS 的 TransportCredentials 选项
	ce := credentials.NewTLS(&tls.Config{
		Certificates: []tls.Certificate{cert},
		ServerName:   "gongguan.com", //注意,这里的参数为配置文件中所允许的ServerName,也就是其中配置的DNS
		RootCAs:      certPool,
	})
	// 连接
	conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithTransportCredentials(ce))
	if err != nil {
		grpclog.Fatalln("连接错误:", err)
	}
	defer conn.Close()

	// 初始化客户端
	c := pb.NewGreeterClient(conn)
	// 调用方法
	result, err := c.ServerStreamMode(context.Background(), &pb.StreamRequest{Name: "gongguan"})
	if err != nil {
		grpclog.Fatalln("错误:", err)
	}
	fmt.Println("start of stream")
	//for后面的语句都被忽略,此时for执行无限循环
	for {
		//通过Recv()接收message
		recvinfo, err := result.Recv()
		if err == io.EOF {
			fmt.Println("end of stream")
			break
		}
		fmt.Println("结果", recvinfo)
	}
	result.CloseSend()
}
  • 调用方法ServerStreamMode,第一个参数表示超时机制、第二个参数为实例化结构体StreamRequest
  • 执行for无限循环,通过Recv()从服务端获取消息流
  • io.EOF:读取内容完毕后,再也读不到就会返回io.EOF,此时说明已经全部读取完服务端的流信息,读取不到,客户端发起断开连接
  • 接收完成后调用CloseSend()关闭stream,让服务端不会继续产生流消息,如果要继续调用Recv(),会重新激活stream,继续获取消息,此时服务端是不关闭的

5、运行服务端与客户端,输出内容如下:

总结:

  • 客户端通过Recv方法接收来自服务端的message
  • 服务端通过Send方法发送响应message

客户端流模式RPC

1、定义stream_hello.proto文件,内容如下:

syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
  //客户端流式,在发送数据前加上stream标识
  rpc ClientStreamMode (stream StreamRequest) returns (StreamResponse) {}
}
message StreamRequest {
  string name = 1;
}
message StreamResponse {
  string message = 1;
}

2、根据定义的文件通过protoc编译工具生成go文件与grpc文件,如下:

protoc --go_out=. ./stream_hello.proto
protoc --go-grpc_out=. ./stream_hello.proto

3、查看stream_hello_grpc.pb.go文件,可以看到相关的流信息如下:

  • 客户端流使用Send发送message,使用CloseAndRecv接收message
  • 客户端ClientStreamMode方法第一个返回值为Greeter_ClientStreamModeClient(截图不全因此看不到),表明生成一条流,用于发送和接收message,如果有多个方法,则每个方法各自生成一条流
  • 服务端ClientStreamMode方法入参是Greeter_ClientStreamModeServer(流),具体方法需要用户自行实现,可以从流中接收和发送message

4、定义服务端文件server.go,内容如下:

package main

import (
	"Goproject/clientstreamrpc/pb"
	"fmt"
	"io"
	"log"
	"net"

	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
)

//服务对象
type streamServer struct {
	pb.UnimplementedGreeterServer
}

//调用服务的接口中的方法
func (s *streamServer) ClientStreamMode(cs pb.Greeter_ClientStreamModeServer) error {
	fmt.Println("start of stream")
	var recv_result []*pb.StreamRequest
	for {
		recvs, err := cs.Recv() //接收数据,返回类型为StreamRequest
		if err == io.EOF {
			fmt.Println("end of stream")
			break
		}
		recv_result = append(recv_result, recvs) //添加切片数据
	}
	//定义变量
	a := ""
	for k, _ := range recv_result {
		a += recv_result[k].Name //合并两个值
	}
	err := cs.SendAndClose(&pb.StreamResponse{Message: a})
	if err != nil {
		log.Fatal(err)
	}
	return nil
}
func main() {
	//创建监听
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	//启动一个grpc服务
	s := grpc.NewServer()
	//将服务对象注册到grpc的内部注册中心
	pb.RegisterGreeterServer(s, &streamServer{})

	//注册反射服务
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
  • ClientStreamMode方法中通过Recv接收数据,然后添加到切片中,通过SendAndClose发送并关闭发送方向的流
  • 获取的数据添加到切片后,再获取出来,然后合并在一起,并赋值给变量a,然后通过SendAndClose发送到流中

5、定义客户端文件client.go,内容如下:

package main

import (
	"Goproject/clientstreamrpc/pb"
	"context"
	"fmt"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/grpclog"
)

func main() {
	// 连接
	conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithInsecure())
	if err != nil {
		grpclog.Fatalln("连接错误:", err)
	}
	defer conn.Close()

	// 初始化客户端
	c := pb.NewGreeterClient(conn)
	// 调用方法
	result, err := c.ClientStreamMode(context.Background())
	if err != nil {
		grpclog.Fatalln("错误:", err)
	}
	//发送两条数据
	_ = result.Send(&pb.StreamRequest{Name: "hello"})
	time.Sleep(time.Second)
	_ = result.Send(&pb.StreamRequest{Name: "word"})
	time.Sleep(time.Second)
	//发送两次数据后主动关闭流并等待接收来自server端的message
	res, err := result.CloseAndRecv()
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(res)
}
  • 通过Send发送两条数据,字段名为hello和word,发送到服务端后合并为helloworld,然后通过CloseAndRecv关闭发送数据的流并接收来自服务端的流信息

6、运行服务端和客户端,查看显示内容如下:

总结:

  • 客户端通过Send方法多次发送message,通过CloseAndRecv方法主动关闭发送方向的流同时等待接收来自服务端的message
  • 服务端通过Recv方法多次接收message,通过SendAndClose方法发送响应message并关闭发送方向的流

双向流模式RPC

1、定义stream_hello.proto文件,内容如下:

syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
  //双向流模式,在发送和接收数据前加上stream标识
  rpc TwoWayStreamMode (stream StreamRequest) returns (stream StreamResponse) {}
}
message StreamRequest {
  string name = 1;
}
message StreamResponse {
  string message = 1;
}

2、根据定义的文件通过protoc编译工具生成go文件与grpc文件,如下:

protoc --go_out=. ./stream_hello.proto
protoc --go-grpc_out=. ./stream_hello.proto

3、查看stream_hello_grpc.pb.go文件,可以看到相关的流信息如下:

  • 无论客户端还是服务端流,都包含Send和Recv两种方法,分别用于发送和接收message
  • grpc.ClientStream实现了CloseSend()方法,用于关闭流的发送方向
  • 客户端TwoWayStreamMode方法的第一个返回值为Greeter_TwoWayStreamModeClient,表明生成一条流,用于发送和接收message,如果有多个方法,则每个方法各自生成一条流
  • 服务端TwoWayStreamMode方法的入参是Greeter_TwoWayStreamModeServer(流),具体方法需要用户自行实现,可以从流中发送和接收message

4、定义服务端文件server.go,内容如下:

package main

import (
	"Goproject/clientstreamrpc/pb"
	"fmt"
	"io"
	"log"
	"net"

	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
)

//服务对象
type streamServer struct {
	pb.UnimplementedGreeterServer
}

//调用服务的接口中的方法
func (s *streamServer) TwoWayStreamMode(tw pb.Greeter_TwoWayStreamModeServer) error {
	for {
		recvs, err := tw.Recv()
		//客户端调用CloseSend关闭流后,退出for循环
		if err == io.EOF {
			break
		}
		fmt.Println("from stream client question:", recvs)
		//将数据发送到流中
		err = tw.Send(&pb.StreamResponse{Message: recvs.Name})
		if err != nil {
			log.Fatal(err)
		}
	}
	return nil
}
func main() {
	//创建监听
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	//启动一个grpc服务
	s := grpc.NewServer()
	//将服务对象注册到grpc的内部注册中心
	pb.RegisterGreeterServer(s, &streamServer{})

	//注册反射服务
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
  • Recv方法会一直阻塞直到从stream中接收到message,或者直到客户端调用CloseSend方法
  • 当客户端调用CloseSend方法时,服务端调用Recv方法会得到io.EOF返回值

5、定义客户端文件client.go,内容如下:

package main

import (
	"Goproject/clientstreamrpc/pb"
	"context"
	"fmt"
	"io"
	"log"
	"strconv"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/grpclog"
)

func main() {
	// 连接
	conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithInsecure())
	if err != nil {
		grpclog.Fatalln("连接错误:", err)
	}
	defer conn.Close()

	// 初始化客户端
	c := pb.NewGreeterClient(conn)
	// 调用方法
	result, err := c.TwoWayStreamMode(context.Background())
	if err != nil {
		grpclog.Fatalln("错误:", err)
	}
	fmt.Println("start of stream.....")
	//发送两条数据
	for i := 0; i <= 10; i++ {
		_ = result.Send(&pb.StreamRequest{Name: "hello" + strconv.Itoa(i)})
		time.Sleep(time.Second)
	}
	//发送完成后,调用CloseSend关闭流
	err = result.CloseSend()
	if err != nil {
		log.Fatal(err)
	}
	//从流中接收数据
	for {
		res, err := result.Recv()
		if err == io.EOF {
			fmt.Println("end of stream.....")
			break
		}
		fmt.Println("from stream server answer:", res)
	}
}
  • 客户端通过CloseSend方法主动关闭发送方向的流
  • 主函数里通过for循环接收来自服务端的响应message。当客户端主动关闭流,服务端在返回最后一个响应message后客户端通过Recv方法会得到返回值io.EOF(标志着整个流的结束)

6、运行服务端和客户端文件,如图:

总结:

  • 无论是客户端还是服务端Recv方法会一直阻塞直到收到message或者对端关闭stream
  • 当一方关闭stream时,对端会返回io.EOF

注意:流也可以配合goroutine实现并发

标签