golang使用gRPC
gRPC 是一个 基于 HTTP/2 协议设计的 RPC 框架,它采用了 Protobuf 作为 IDL(交互式数据语言Interactive Data Language)
一、什么是RPC?
RPC 代指远程过程调用(Remote Procedure Call),它的调用包含了传输协议和编码(对象序列号)协议等等。允许运行于一台计算机的程序调用另一台计算机的子程序,而开发人员无需额外地为这个交互作用编程,可以基于 HTTP 协议
例如:A服务器上的应用如何要调用B服务器上的应用,不能直接调用,但是如果通过RPC,就十分方便,使用RPC,就跟本地调用一个函数一样简单
常见 RPC 框架比较:
| 名称 | 跨语言 | 多IDL | 服务治理 | 注册中心 | 服务管理 |
| gRPC | √ | × | × | × | × |
| Thrift | √ | × | × | × | × |
| Rpcx | × | √ | √ | √ | √ |
| Dubbo | × | √ | √ | √ | √ |
RPC优点:简单、通用、安全、效率,gRPC还是语言无关的。你可以用C++作为服务端,使用Golang、Java等作为客户端。为了实现这一点,我们在”定义服务“和在编码和解码的过程中,应该是做到语言无关的,如图:

从上图中可以看出,gRPC使用了Protocol Buffers
gRPC 支持定义 4 种类型的服务方法,分别是简单模式、服务端数据流模式、客户端数据流模式和双向数据流模式
- 简单模式、一元RPC(Simple RPC、Unary RPC)是最简单的 gRPC 模式。客户端发起一次请求,服务端响应一个数据。定义格式为 rpc SayHello (HelloRequest) returns (HelloReply) {},一问一答
- 服务端数据流模式(Server-side streaming RPC):客户端发送一个请求,服务器返回数据流响应,客户端从流中读取数据直到为空。定义格式为 rpc SayHello (HelloRequest) returns (stream HelloReply) {},一问多答
- 客户端数据流模式(Client-side streaming RPC):客户端将消息以流的方式发送给服务器,服务器全部处理完成之后返回一次响应。定义格式为 rpc SayHello (stream HelloRequest) returns (HelloReply) {},多问一答
- 双向数据流模式(Bidirectional streaming RPC):客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互 RPC 框架原理。定义格式为 rpc SayHello (stream HelloRequest) returns (stream HelloReply) {},多问多答
二、Protobuf
Protocol Buffers 是一种与语言、平台无关,可扩展的序列化结构化数据的方法,常用于通信协议,数据存储等等。相较于 JSON、XML,它更小、更快、更简单
可以将其当成一个代码生成工具以及序列化工具,这个工具可以把我们定义的方法,转换成特定语言的代码。比如你定义了一种类型的参数,他会帮你转换成Golang中的struct结构体,你定义的方法,他会帮你转换成func函数,此外,在发送请求和接受响应的时候,这个工具还会完成对应的编码和解码工作,将你即将发送的数据编码成gRPC能够传输的形式,又或者将即将接收到的数据解码为编程语言能够理解的数据格式。
语法如下:
//声明使用的是proto3语法
syntax = "proto3";
//package为可选,用来防止不同的消息名称冲突
package pb;
// ./pb表示生成的go文件位于pb目录下,文件的包名为pb
option go_package = "./pb;pb";
//定义服务,服务中有rpc方法用于接收客户端消息返回服务端消息
service Greeter {
rpc Say (HelloRequest) returns (HelloReply) {}
}
//对应上面方法中的消息接收和消息回复
message HelloRequest {
string name = 1;
string age = 2;
}
message HelloReply {
string hello = 1;
string word = 2;
}
message是关键字,会将message定义的字段转换为HelloRequest和HelloReply结构体。注意,后面的1、2不是赋值,而是在定义这个变量在这个message中的位置,如果有多个就继续写1、2、3、4、5等
string表示定义的字段类型,name、age等表示字段名,实际对应的就是结构体中的字段名和类型,其中字段有三个规则:
required:消息体中必填字段,不设置会导致编解码异常,此关键字可以忽略optional: 消息体中可选字段,通过此字段修饰后生成的是指针repeated: 消息体中可重复字段,重复的值的顺序会被保留,在go中重复字段会被定义为切片
message User {
string username = 1;
int age = 2;
optional string password = 3; // 生成的是指针
repeated string address = 4; // 生产的是切片
}
字段类型映射关系:
| proto类型 | 说明 | 对应python类型 | 对应Go类型 |
| double | float64 | float64 | |
| float | float | float32 | |
| int32 | 使用变长编码,对于负值的效率很低,如果你的域有 可能有负值,请使用sint32替代 | int | int32 |
| int64 | 使用变长编码,对于负值的效率很低,如果你的域有 可能有负值,请使用sint64替代 | int/long[3] | int64 |
| uint32 | 使用变长编码 | int/long | uint32 |
| uint64 | 使用变长编码 | int/long | uint64 |
| sint32 | 使用变长编码,这些编码在负值时比int32高效的多 | int | int32 |
| sint64 | 使用变长编码,有符号的整型值。编码时比通常的 int64高效 | int/long | int64 |
| fixed32 | 总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效 | int | uint32 |
| fixed64 | 总是8个字节,如果数值总是比总是比256大的话,这 个类型会比uint64高效 | int/long | uint64 |
| sfixed32 | 总是4个字节 | int | int32 |
| sfixed64 | 总是8个字节 | int/long | int64 |
| bool | bool | bool | |
| string | 一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本 | str/unicode | string |
| bytes | 可能包含任意顺序的字节数据 | str | []byte |
字段默认值:
| 类型 | 默认值 |
| bool | false |
| 整型 | 0 |
| string | 空字符串”” |
| 枚举enum | 第一个枚举元素的值,Protobuf3强制要求第一个枚举元素值必须是0,所以枚举的默认值就是0 |
| message | 不是null,而是DEFAULT_INSTANCE |
三、实战(简单模式):
实现目标:客户端发送消息给服务端,服务端收到消息后,返回响应给客户端
1、安装grpc核心库,命令如下:
go get google.golang.org/grpc #如果安装失败需要科学上网
也可以先下载出来然后放在GOPATH/pkg目录下,Github地址:
https://github.com/grpc/grpc-go
2、安装protobuf:
wget https://github.com/google/protobuf/releases/download/v21.8/protobuf-all-21.8.zip
unzip protobuf-all-21.8.zip
cd protobuf-all-21.8
./configure
make && make install
3、执行命令查看是否安装成功:
protoc --version
如果报错:protoc: error while loading shared libraries: libprotobuf.so.15: cannot open
shared object file: No such file or directory,执行如下命令:
ldconfig #重新加载动态库
4、安装protoc插件protoc-gen-go、protoc-gen-go-grpc,命令如下:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
执行完成后,在GOPATH/bin目录下可以看到响应的二进制文件,如图:

5、首先编写proto文件,
创建项目test,通过包管理器go mod来管理,执行go mod init test初始化,然后在目录下创建hello.proto文件,内容如图:

注:上图中的service Greeter会转换为hello_grpc.pb.go中的GreeterServer和GreeterClient接口,接口中包含Say()方法
利用hello.proto并且使用如下两个命令生成对应的go文件,如下:
cd /data/go/src/test
protoc --go_out=. ./hello.proto #--go_out指定生成golagn文件
protoc --go-grpc_out=. ./hello.proto
执行后在test目录下将自动创建文件夹pb,里面生成响应的go文件,如图:

在hello.proto中定义的message都会转化为hello.pb.go中的对应名字的结构体,message中的字段也会转换为结构体中对应的字段,service中定义的rpc方法Say也会转换为hello_grpc.pb.go中接口的方法
注:其实是利用protoc-gen-go把定义的与语言无关的hello.proto转换为了go语言的代码,以便server和client直接使用
6、编写服务端,在test下创建server.go,内容如下:
package main
import (
"test/pb"
"log"
"net"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
//定义服务对象来实现.proto文件中的Say方法
//新版本 gRPC 要求必须嵌入 pb.UnimplementedGreeterServer 结构体
type server struct {
pb.UnimplementedGreeterServer
}
//实现服务的接口,Say为hello_grpc.go中的接口中的方法,实现了此方法即实现了接口
func (s *server) Say(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Hello: "Hello+" + in.Name}, nil
}
func main() {
//创建监听
lis, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//创建一个grpc服务对象
s := grpc.NewServer()
//将服务对象注册到grpc的内部注册中心
pb.RegisterGreeterServer(s, &server{})
//注册反射服务,一般用来调试,可省略
//reflection.Register(s)
//启动grpc
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
上面例子中演示的是返回HelloReply结构体中字段Hello的值+客户端请求的时候传递的Name字段的值,hello和name对应的是返回和请求的结构体中的字段信息,不可乱写
注:如果运行服务端提示找不到包,可使用go mod tidy来检查依赖包并下载更新即可
7、编写客户端,在test目录下创建client.go,内容如下:
package main
import (
"test/pb"
"context"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
func main() {
// 连接服务端
conn, err := grpc.Dial("192.168.163.128:8888", grpc.WithInsecure())
if err != nil {
grpclog.Fatalln("连接错误:", err)
}
//延迟关闭连接
defer conn.Close()
// 初始化客户端
c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
req := &pb.HelloRequest{Name: "gRPC"}
//调用服务端Say方法
res, err := c.Say(ctx, req)
if err != nil {
grpclog.Fatalln("错误:", err)
}
fmt.Println(res.Hello)
}
8、启动服务端和客户端,查看客户端运行结果如下:

注意:可通过go build client.go将客户端编译为二进制后放在其余机器上直接调用即可
上面通过简单方式实现rpc通信就算完成了,但是存在一个安全问题,数据都是明文传输的,有被窃听和篡改的风险,因此建议通过证书的形式进行加密
四、基于CA颁发证书形式实现数据加密传输,双向认证:
CA为根证书颁发机构,主要包含的文件为:公钥和秘钥
1、首先在test/cert目录下创建文件ca.conf,内容如下:
[ req ]
default_bits = 4096
distinguished_name = req_distinguished_name
[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = GuangDong
localityName = Locality Name (eg, city)
localityName_default = ShenZhen
organizationName = Organization Name (eg, company)
organizationName_default = Yunwei
commonName = CommonName (e.g. server FQDN or YOUR name)
commonName_max = 64
commonName_default = gongguan
2、生成ca秘钥,得到ca.key文件,如下:
openssl genrsa -out ca.key 4096
2、基于ca.conf生成ca证书签发请求,得到ca.csr文件,如图:
openssl req -new -sha256 -out ca.csr -key ca.key -config ca.conf
3、生成ca根证书,得到ca.pem文件,如下:
openssl x509 -req -days 3650 -in ca.csr -signkey ca.key -out ca.pem
4、在test/cert目录下新建server.conf,内容如下:
#req 总配置
[ req ]
default_bits = 2048
distinguished_name = req_distinguished_name #使用 req_distinguished_name配置模块
req_extensions = req_ext #使用 req_ext配置模块
[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = GuangDong
localityName = Locality Name (eg, city)
localityName_default = ShenZhen
organizationName = Organization Name (eg, company)
organizationName_default = Yunwei
commonName = Common Name (e.g. server FQDN or YOUR name)
commonName_max = 64
commonName_default = gongguan #这里的Common Name 写主要域名即可(注意:这个域名也要在alt_names的DNS.x里) 此处尤为重要,需要用该服务名字填写到客户端的代码中
[ req_ext ]
subjectAltName = @alt_names #使用 alt_names配置模块,subjectAltName缩写即为SAN
[alt_names]
DNS.1 = localhost
DNS.2 = gongguan.com
DNS.3 = gongguan.com
IP = 127.0.0.1
5、生成秘钥,得到server.key,如下:
openssl genrsa -out server.key 2048
6、生成证书签发请求,得到server.csr文件,如下:
openssl req -new -sha256 -out server.csr -key server.key -config server.conf
7、基于CA证书生成服务端证书,得到server.pem文件,如下:
openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -CAcreateserial -in server.csr -out server.pem -extensions req_ext -extfile server.conf
8、在test/cert目录下,新建client.conf文件,内容如下:
#req 总配置
[ req ]
default_bits = 2048
distinguished_name = req_distinguished_name #使用 req_distinguished_name配置模块
req_extensions = req_ext #使用 req_ext配置模块
[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = GuangDong
localityName = Locality Name (eg, city)
localityName_default = ShenZhen
organizationName = Organization Name (eg, company)
organizationName_default = Yunwei
commonName = Common Name (e.g. server FQDN or YOUR name)
commonName_max = 64
commonName_default = gongguan #这里的Common Name 写主要域名即可(注意:这个域名也要在alt_names的DNS.x里) 此处尤为重要,需要用该服务名字填写到客户端的代码中
[ req_ext ]
subjectAltName = @alt_names #使用 alt_names配置模块
[alt_names]
DNS.1 = localhost
DNS.2 = gongguan.com
DNS.3 = gongguan.com
IP = 127.0.0.1
9、生成秘钥,得到client.key文件,如下:
openssl ecparam -genkey -name secp384r1 -out client.key
10、生成证书签发请求,得到client.csr文件,如下:
openssl req -new -sha256 -out client.csr -key client.key -config client.conf
11、用CA证书生成客户端证书,得到client.pem文件,如下:
openssl x509 -req -days 3650 -CA ca.pem -CAkey ca.key -CAcreateserial -in client.csr -out client.pem -extensions req_ext -extfile client.conf
- key:服务器上的私钥文件,用于对发送给客户端数据的加密,以及对从客户端接收到数据的解密
- 证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名
- pem:是基于Base64编码的证书格式,扩展名包括PEM、CRT和CER
12、配置server(服务端)证书,如下:
package main
import (
"Goproject/grpc/pb"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"net"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
)
//服务对象
type server struct {
pb.UnimplementedGreeterServer
}
//实现服务的接口,Say为hello_grpc.go中的接口中的方法,实现了此方法即实现了接口
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello+" + in.Name}, nil
}
func main() {
//加载证书,读取和解析信息,得到证书公钥、密钥对
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatalf("tls-err")
}
//初始化一个CertPool
certPool := x509.NewCertPool()
//读取并解析pem类型的根证书
ca, err := ioutil.ReadFile("cert/ca.pem")
if err != nil {
log.Fatalf("ioutil.ReadFile err: %v", err)
}
//解析成功后将其加入到池子中
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatalf("certPool.AppendCertsFromPEM err")
}
//构建基于TLS的TransportCredentials选项
c := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert}, //服务端证书链,可以有多个
ClientAuth: tls.RequireAndVerifyClientCert, //要求必须验证客户端证书
ClientCAs: certPool, //设置根证书集合,校验方式使用ClientAuth中设定的模式
})
//创建监听
lis, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//启动一个grpc服务,并开启TLS认证
s := grpc.NewServer(grpc.Creds(c))
//将服务对象注册到grpc的内部注册中心
pb.RegisterGreeterServer(s, &server{})
//注册反射服务
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
13、配置client(客户端)证书,如下:
package main
import (
"Goproject/grpc/pb"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
func main() {
//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, err := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
if err != nil {
log.Fatalf("tls.LoadX509KeyPair err: %v", err)
}
//创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
//这里只能解析pem类型的根证书,因此需要ca.pem
ca, err := ioutil.ReadFile("cert/ca.pem")
if err != nil {
log.Fatalf("ioutil.ReadFile err: %v", err)
}
//解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面的使用
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatalf("certPool.AppendCertsFromPEM err")
}
//构建基于 TLS 的 TransportCredentials 选项
ce := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "gongguan.com", //注意,这里的参数为配置文件中所允许的ServerName,也就是其中配置的DNS
RootCAs: certPool,
})
// 连接
conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithTransportCredentials(ce))
if err != nil {
grpclog.Fatalln("连接错误:", err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用方法
req := &pb.HelloRequest{Name: "gRPC"}
res, err := c.SayHello(ctx, req)
if err != nil {
grpclog.Fatalln("错误:", err)
}
fmt.Println(res.Message)
}
14、运行服务端和客户端文件,命令如下:
go run sever.go
go run client.go
客户端执行如下,通信成功:

注:如果将客户端文件部署到其余的机器,需要将证书文件一起部署过去,否则报错,涉及文件ca.pem、client.key、client.pem
五、流式RPC
上面演示的RPC都是简单的RPC(Simple RPC),在使用 Simple RPC 时,有如下问题:
- 数据包过大造成的瞬时压力
- 接收数据包时,需要所有数据包都接受成功且正确后,才能够回调响应,进行业务处理(无法客户端边发送,服务端边处理)
流式RPC(Streaming RPC)适合场景:
- 大规模数据包
- 实时数据场景
流式RPC分为三种:
- 服务端流模式RPC
- 客户端流模式RPC
- 双向流模式RPC
服务端流模式RPC
1、定义stream_hello.proto文件,注意关键字stream,如下:
syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
//服务端流式,在返回数据前加上stream标识
rpc ServerStream (StreamRequest) returns (stream StreamResponse) {}
}
message StreamRequest {
string name = 1;
}
message StreamResponse {
string message = 1;
}
上面代码定义了服务Greeter以及服务端流式函数ServerStream,返回数据通过stream关键字标识为流
2、根据定义的文件通过protoc编译工具生成go文件与grpc文件,如下:
protoc --go_out=. ./stream_hello.proto
protoc --go-grpc_out=. ./stream_hello.proto

查看stream_hello_grpc.pb.go文件,可以看到相关的流信息如下:




- 客户端流使用
Recv方法接收message,服务端流通过Send方法发送message,默认每次Recv()接收最大消息长度为`1024*1024*4`bytes(4M),单次发送消息最大长度为`math.MaxInt32`bytes,为2048M - 客户端接口的ServerStreamMode方法返回值是Greeter_ServerStreamModeClient(图片太大,上图中未截全),表明生成了一条流,用于接收message,如果有多个方法,那么每个方法各自生成一条流
- 服务端接口的SerrverStreamMode方法的第一个参数为message,第二个参数为Greeter_ServerStreamModeServer(流),可以从流中发送message
3、定义服务端文件stream_server.go,注意看方法ServerStreamMode部分,如下:
package main
import (
"Goproject/grpc/pb"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
)
//服务对象
type streamServer struct {
pb.UnimplementedGreeterServer
}
//实现服务端接口,Say为hello_grpc.go中的接口中的方法,实现了此方法即实现了接口
func (s *streamServer) ServerStreamMode(p *pb.StreamRequest, st pb.Greeter_ServerStreamModeServer) error {
fmt.Println("接收来自客户端的消息......")
name := p.GetName()
for i := 0; i <= 5; i++ {
err := st.Send(&pb.StreamResponse{Message: name})
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second * 1)
}
fmt.Println("服务端流发送结束")
return nil
}
func main() {
//加载证书,读取和解析信息,得到证书公钥、密钥对
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatalf("tls-err")
}
//初始化一个CertPool
certPool := x509.NewCertPool()
//读取并解析pem类型的根证书
ca, err := ioutil.ReadFile("cert/ca.pem")
if err != nil {
log.Fatalf("ioutil.ReadFile err: %v", err)
}
//解析成功后将其加入到池子中
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatalf("certPool.AppendCertsFromPEM err")
}
//构建基于TLS的TransportCredentials选项
c := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert}, //服务端证书链,可以有多个
ClientAuth: tls.RequireAndVerifyClientCert, //要求必须验证客户端证书
ClientCAs: certPool, //设置根证书集合,校验方式使用ClientAuth中设定的模式
})
//创建监听
lis, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//启动一个grpc服务,并开启TLS认证
s := grpc.NewServer(grpc.Creds(c))
//将服务对象注册到grpc的内部注册中心
pb.RegisterGreeterServer(s, &streamServer{})
//注册反射服务
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
4、定义客户端client.go,内容如下:
package main
import (
"Goproject/grpc/pb"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
func main() {
//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, err := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
if err != nil {
log.Fatalf("tls.LoadX509KeyPair err: %v", err)
}
//创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
//这里只能解析pem类型的根证书,因此需要ca.pem
ca, err := ioutil.ReadFile("cert/ca.pem")
if err != nil {
log.Fatalf("ioutil.ReadFile err: %v", err)
}
//解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面的使用
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatalf("certPool.AppendCertsFromPEM err")
}
//构建基于 TLS 的 TransportCredentials 选项
ce := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "gongguan.com", //注意,这里的参数为配置文件中所允许的ServerName,也就是其中配置的DNS
RootCAs: certPool,
})
// 连接
conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithTransportCredentials(ce))
if err != nil {
grpclog.Fatalln("连接错误:", err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewGreeterClient(conn)
// 调用方法
result, err := c.ServerStreamMode(context.Background(), &pb.StreamRequest{Name: "gongguan"})
if err != nil {
grpclog.Fatalln("错误:", err)
}
fmt.Println("start of stream")
//for后面的语句都被忽略,此时for执行无限循环
for {
//通过Recv()接收message
recvinfo, err := result.Recv()
if err == io.EOF {
fmt.Println("end of stream")
break
}
fmt.Println("结果", recvinfo)
}
result.CloseSend()
}
- 调用方法ServerStreamMode,第一个参数表示超时机制、第二个参数为实例化结构体StreamRequest
- 执行for无限循环,通过Recv()从服务端获取消息流
- io.EOF:读取内容完毕后,再也读不到就会返回io.EOF,此时说明已经全部读取完服务端的流信息,读取不到,客户端发起断开连接
- 接收完成后调用CloseSend()关闭stream,让服务端不会继续产生流消息,如果要继续调用Recv(),会重新激活stream,继续获取消息,此时服务端是不关闭的
5、运行服务端与客户端,输出内容如下:


总结:
- 客户端通过
Recv方法接收来自服务端的message - 服务端通过
Send方法发送响应message
客户端流模式RPC
1、定义stream_hello.proto文件,内容如下:
syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
//客户端流式,在发送数据前加上stream标识
rpc ClientStreamMode (stream StreamRequest) returns (StreamResponse) {}
}
message StreamRequest {
string name = 1;
}
message StreamResponse {
string message = 1;
}
2、根据定义的文件通过protoc编译工具生成go文件与grpc文件,如下:
protoc --go_out=. ./stream_hello.proto
protoc --go-grpc_out=. ./stream_hello.proto
3、查看stream_hello_grpc.pb.go文件,可以看到相关的流信息如下:




- 客户端流使用
Send发送message,使用CloseAndRecv接收message - 客户端ClientStreamMode方法第一个返回值为Greeter_ClientStreamModeClient(截图不全因此看不到),表明生成一条流,用于发送和接收message,如果有多个方法,则每个方法各自生成一条流
- 服务端ClientStreamMode方法入参是Greeter_ClientStreamModeServer(流),具体方法需要用户自行实现,可以从流中接收和发送message
4、定义服务端文件server.go,内容如下:
package main
import (
"Goproject/clientstreamrpc/pb"
"fmt"
"io"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
//服务对象
type streamServer struct {
pb.UnimplementedGreeterServer
}
//调用服务的接口中的方法
func (s *streamServer) ClientStreamMode(cs pb.Greeter_ClientStreamModeServer) error {
fmt.Println("start of stream")
var recv_result []*pb.StreamRequest
for {
recvs, err := cs.Recv() //接收数据,返回类型为StreamRequest
if err == io.EOF {
fmt.Println("end of stream")
break
}
recv_result = append(recv_result, recvs) //添加切片数据
}
//定义变量
a := ""
for k, _ := range recv_result {
a += recv_result[k].Name //合并两个值
}
err := cs.SendAndClose(&pb.StreamResponse{Message: a})
if err != nil {
log.Fatal(err)
}
return nil
}
func main() {
//创建监听
lis, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//启动一个grpc服务
s := grpc.NewServer()
//将服务对象注册到grpc的内部注册中心
pb.RegisterGreeterServer(s, &streamServer{})
//注册反射服务
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
- ClientStreamMode方法中通过Recv接收数据,然后添加到切片中,通过SendAndClose发送并关闭发送方向的流
- 获取的数据添加到切片后,再获取出来,然后合并在一起,并赋值给变量a,然后通过SendAndClose发送到流中
5、定义客户端文件client.go,内容如下:
package main
import (
"Goproject/clientstreamrpc/pb"
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
func main() {
// 连接
conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithInsecure())
if err != nil {
grpclog.Fatalln("连接错误:", err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewGreeterClient(conn)
// 调用方法
result, err := c.ClientStreamMode(context.Background())
if err != nil {
grpclog.Fatalln("错误:", err)
}
//发送两条数据
_ = result.Send(&pb.StreamRequest{Name: "hello"})
time.Sleep(time.Second)
_ = result.Send(&pb.StreamRequest{Name: "word"})
time.Sleep(time.Second)
//发送两次数据后主动关闭流并等待接收来自server端的message
res, err := result.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
fmt.Println(res)
}
- 通过Send发送两条数据,字段名为hello和word,发送到服务端后合并为helloworld,然后通过CloseAndRecv关闭发送数据的流并接收来自服务端的流信息
6、运行服务端和客户端,查看显示内容如下:


总结:
- 客户端通过
Send方法多次发送message,通过CloseAndRecv方法主动关闭发送方向的流同时等待接收来自服务端的message - 服务端通过
Recv方法多次接收message,通过SendAndClose方法发送响应message并关闭发送方向的流
双向流模式RPC
1、定义stream_hello.proto文件,内容如下:
syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
//双向流模式,在发送和接收数据前加上stream标识
rpc TwoWayStreamMode (stream StreamRequest) returns (stream StreamResponse) {}
}
message StreamRequest {
string name = 1;
}
message StreamResponse {
string message = 1;
}
2、根据定义的文件通过protoc编译工具生成go文件与grpc文件,如下:
protoc --go_out=. ./stream_hello.proto
protoc --go-grpc_out=. ./stream_hello.proto
3、查看stream_hello_grpc.pb.go文件,可以看到相关的流信息如下:




- 无论客户端还是服务端流,都包含Send和Recv两种方法,分别用于发送和接收message
- grpc.ClientStream实现了CloseSend()方法,用于关闭流的发送方向
- 客户端TwoWayStreamMode方法的第一个返回值为Greeter_TwoWayStreamModeClient,表明生成一条流,用于发送和接收message,如果有多个方法,则每个方法各自生成一条流
- 服务端TwoWayStreamMode方法的入参是Greeter_TwoWayStreamModeServer(流),具体方法需要用户自行实现,可以从流中发送和接收message
4、定义服务端文件server.go,内容如下:
package main
import (
"Goproject/clientstreamrpc/pb"
"fmt"
"io"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
//服务对象
type streamServer struct {
pb.UnimplementedGreeterServer
}
//调用服务的接口中的方法
func (s *streamServer) TwoWayStreamMode(tw pb.Greeter_TwoWayStreamModeServer) error {
for {
recvs, err := tw.Recv()
//客户端调用CloseSend关闭流后,退出for循环
if err == io.EOF {
break
}
fmt.Println("from stream client question:", recvs)
//将数据发送到流中
err = tw.Send(&pb.StreamResponse{Message: recvs.Name})
if err != nil {
log.Fatal(err)
}
}
return nil
}
func main() {
//创建监听
lis, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//启动一个grpc服务
s := grpc.NewServer()
//将服务对象注册到grpc的内部注册中心
pb.RegisterGreeterServer(s, &streamServer{})
//注册反射服务
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
Recv方法会一直阻塞直到从stream中接收到message,或者直到客户端调用CloseSend方法- 当客户端调用
CloseSend方法时,服务端调用Recv方法会得到io.EOF返回值
5、定义客户端文件client.go,内容如下:
package main
import (
"Goproject/clientstreamrpc/pb"
"context"
"fmt"
"io"
"log"
"strconv"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
func main() {
// 连接
conn, err := grpc.Dial("124.223.49.29:8080", grpc.WithInsecure())
if err != nil {
grpclog.Fatalln("连接错误:", err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewGreeterClient(conn)
// 调用方法
result, err := c.TwoWayStreamMode(context.Background())
if err != nil {
grpclog.Fatalln("错误:", err)
}
fmt.Println("start of stream.....")
//发送两条数据
for i := 0; i <= 10; i++ {
_ = result.Send(&pb.StreamRequest{Name: "hello" + strconv.Itoa(i)})
time.Sleep(time.Second)
}
//发送完成后,调用CloseSend关闭流
err = result.CloseSend()
if err != nil {
log.Fatal(err)
}
//从流中接收数据
for {
res, err := result.Recv()
if err == io.EOF {
fmt.Println("end of stream.....")
break
}
fmt.Println("from stream server answer:", res)
}
}
- 客户端通过
CloseSend方法主动关闭发送方向的流 - 主函数里通过
for循环接收来自服务端的响应message。当客户端主动关闭流,服务端在返回最后一个响应message后客户端通过Recv方法会得到返回值io.EOF(标志着整个流的结束)
6、运行服务端和客户端文件,如图:


总结:
- 无论是客户端还是服务端
Recv方法会一直阻塞直到收到message或者对端关闭stream - 当一方关闭
stream时,对端会返回io.EOF
注意:流也可以配合goroutine实现并发


