go rpc系列3-grpc
我们用过 rpc 来实现过简单的服务,现在我们改用 gRPC 试试。
安装
在我们的项目根下,在命令行执行 Go 语言的 gRPC 库的安装命令,如下:
$ go get -u google.golang.org/grpc@v1.29.1
使用
hello.proto 文件,新增了 HelloService 接口:
syntax = "proto3";
package proto;
message String {
string value = 1;
}
service HelloService {
rpc Hello (String) returns (String);
}
然后使用 protoc-gen-go 内置的 gRPC 插件生成 gRPC 代码:
$ protoc –go_out=plugins=grpc:. ./proto/*.proto
查看生产的 hello.pb.go 文件,gRPC 插件为服务端和客户端生成不同的接口:
// HelloServiceServer is the server API for HelloService service.
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
}
// HelloServiceClient is the client API for HelloService service.
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
}
gRPC 通过 context.Context 参数,为每个方法调用提供了上下文支持。 基于服务端的 HelloServiceServer 接口,我们重新来实现 HelloService 服务:
package main
import (
"context"
"google.golang.org/grpc"
"log"
"net"
pb "rpc/proto" // 设置引用别名
)
type HelloServiceImpl struct{}
func (p *HelloServiceImpl) Hello(ctx context.Context, args *pb.String) (*pb.String, error) {
reply := &pb.String{Value: "hello:" + args.GetValue()}
return reply, nil
}
func main() {
grpcServer := grpc.NewServer()
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
grpcServer.Serve(lis)
}
客户端链接 gRPC 服务:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"log"
pb "rpc/proto" // 设置引用别名
)
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()
client := pb.NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &pb.String{Value: "wekenw"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
上面的是grpc 的一元 RPC(Unary RPC)调用方式。
Server-side streaming RPC:服务端流式 RPC
服务器端流式 RPC,单向流,Server 为 Stream,Client 为普通的一元 RPC 请求。
简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。
syntax = "proto3";
package proto;
message String {
string value = 1;
}
service HelloService {
rpc Hello (String) returns (stream String){};
}
Server:
package main
import (
"google.golang.org/grpc"
"log"
"net"
pb "rpc/proto" // 设置引用别名
"strconv"
)
// HelloServiceImpl 定义我们的服务
type HelloServiceImpl struct{}
//实现Hello方法
func (p *HelloServiceImpl) Hello(req *pb.String, srv pb.HelloService_HelloServer) error {
for n := 0; n < 5; n++ {
// 向流中发送消息, 默认每次send送消息最大长度为`math.MaxInt32`bytes
err := srv.Send(&pb.String{
Value: req.Value + strconv.Itoa(n),
})
if err != nil {
return err
}
}
return nil
}
func main() {
// 新建gRPC服务器实例
grpcServer := grpc.NewServer()
// 在gRPC服务器注册我们的服务
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
log.Println(" net.Listing...")
//用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
Server 端,主要留意 stream.Send 方法,通过阅读源码,可得知是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:
消息体(对象)序列化。 压缩序列化后的消息体。 对正在传输的消息体增加 5 个字节的 header(标志位)。 判断压缩 + 序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误。 写入给流的数据集。
Client:
package main
import (
"context"
"google.golang.org/grpc"
"io"
"log"
pb "rpc/proto" // 设置引用别名
)
// SayHello 调用服务端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
stream, _ := client.Hello(context.Background(), r)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
return nil
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()
// 建立gRPC连接
client := pb.NewHelloServiceClient(conn)
// 创建发送结构体
req := pb.String{
Value: "stream server grpc ",
}
SayHello(client, &req)
}
在 Client 端,主要留意 stream.Recv() 方法,此方法,是对 ClientStream.RecvMsg方法的封装,而 RecvMsg 方法会从流中读取完整的 gRPC 消息体,我们可得知:
RecvMsg 是阻塞等待的。
RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF。
RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误,例如:
io.EOF、io.ErrUnexpectedEOF transport.ConnectionError google.golang.org/grpc/codes(gRPC 的预定义错误码) 需要注意的是,默认的 MaxReceiveMessageSize 值为 1024 1024 4,若有特别需求,可以适当调整。
Client-side streaming RPC:客户端流式 RPC
客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求到服务端,服务端发起一次响应给客户端。
syntax = "proto3";
package proto;
message String {
string value = 1;
}
service HelloService {
rpc Hello (stream String) returns (String){};
}
server:
package main
import (
"google.golang.org/grpc"
"io"
"log"
"net"
pb "rpc/proto" // 设置引用别名
)
// HelloServiceImpl 定义我们的服务
type HelloServiceImpl struct{}
//实现Hello方法
func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error {
for {
resp, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.String{Value:"say.hello"})
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
return nil
}
func main() {
// 新建gRPC服务器实例
grpcServer := grpc.NewServer()
// 在gRPC服务器注册我们的服务
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
log.Println(" net.Listing...")
//用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
client:
package main
import (
"context"
"google.golang.org/grpc"
"log"
pb "rpc/proto" // 设置引用别名
)
// SayHello 调用服务端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
stream, _ := client.Hello(context.Background())
for n := 0; n < 6; n++ {
_ = stream.Send(r)
}
resp, _ := stream.CloseAndRecv()
log.Printf("resp err: %v", resp)
return nil
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()
// 建立gRPC连接
client := pb.NewHelloServiceClient(conn)
// 创建发送结构体
req := pb.String{
Value: "stream server grpc ",
}
SayHello(client, &req)
}
在 Server 端的 stream.CloseAndRecv,与 Client 端 stream.SendAndClose 是配套使用的方法。
Bidirectional streaming RPC:双向流式 RPC
双向流式 RPC,由客户端以流式的方式发起请求,服务端也以流式的方式响应请求。
首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)。
syntax = "proto3";
package proto;
message String {
string value = 1;
}
service HelloService {
rpc Hello (stream String) returns (stream String){};
}
server:
package main
import (
"google.golang.org/grpc"
"io"
"log"
"net"
pb "rpc/proto" // 设置引用别名
)
// HelloServiceImpl 定义我们的服务
type HelloServiceImpl struct{}
//实现Hello方法
func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error {
for {
_ = stream.Send(&pb.String{Value: "say.hello"})
resp, err := stream.Recv()
//接收完了返回
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
}
func main() {
// 新建gRPC服务器实例
grpcServer := grpc.NewServer()
// 在gRPC服务器注册我们的服务
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
log.Println(" net.Listing...")
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
client:
package main
import (
"context"
"google.golang.org/grpc"
"io"
"log"
pb "rpc/proto" // 设置引用别名
)
// SayHello 调用服务端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
stream, _ := client.Hello(context.Background())
for n := 0; n <= 3; n++ {
_ = stream.Send(r)
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp err: %v", resp)
}
_ = stream.CloseSend()
return nil
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()
// 建立gRPC连接
client := pb.NewHelloServiceClient(conn)
// 创建发送结构体
req := pb.String{
Value: "stream server grpc ",
}
SayHello(client, &req)
}
服务端在循环中接收客户端发来的数据,如果遇到io.EOF表示客户端流被关闭,如果函数退出表示服 务端流关闭。生成返回的数据通过流发送给客户端,双向流数据的发送和接收都是完全独立的行为。需 要注意的是,发送和接收的操作并不需要一一对应,用户可以根据真实场景进行组织代码。