流式RPC适合用于需要处理大量数据或需要实时交互的场景,例如音视频流传输,实时聊天等
服务端流式RPC 客户端发送单个请求到服务器,服务器以流式方式返回多个响应,直到完成处理或者达到某个条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 syntax = "proto3" ; option go_package = "hello_server/pb" ; package pb; service Greeter { rpc LotsOfReplies(HelloRequest)returns (stream HelloResponse) {} } message HelloRequest { string name = 1 ; }
之后生成一下代码
1 protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/hello.proto
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package mainimport ( "context" "fmt" "google.golang.org/grpc" "hello_server/pb" "net" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error { words := []string { "你好" , "hello" , "こんにちは" , "안녕하세요" , } for _, word := range words { data := &pb.HelloResponse{ Reply: word + in.GetName(), } if err := stream.Send(data); err != nil { return err } } return nil } func main () { listen, err := net.Listen("tcp" , ":8972" ) if err != nil { fmt.Printf("failed to listen, err: %v" , err) return } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) err = s.Serve(listen) if err != nil { fmt.Printf("s.Serve err: %v\n" , err) return } }
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package mainimport ( "context" "flag" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "hello_client/proto" "io" "log" "time" ) var name = flag.String("name" , "Rzyy" , "通过-name说出名称" )func main () { flag.Parse() conn, err := grpc.Dial("127.0.0.1:8972" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("grpc.Dial err: %v" , err) } defer conn.Close() client := proto.NewGreeterClient(conn) runLotsOfReplies(client) } func runLotsOfReplies (c proto.GreeterClient) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() stream, err := c.LotsOfReplies(ctx, &proto.HelloRequest{Name: *name}) if err != nil { log.Fatalf("c.LotsOfReplies failed, err: %v" , err) } for { res, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("c.LotsOfReplies failed, err: %v" , err) } log.Printf("got reply: %q\n" , res.GetReply()) } }
执行结果:
1 2 3 4 2023 /05 /15 20 :49 :43 got reply: "你好Rzyy" 2023 /05 /15 20 :49 :43 got reply: "helloRzyy" 2023 /05 /15 20 :49 :43 got reply: "こんにちはRzyy" 2023 /05 /15 20 :49 :43 got reply: "안녕하세요Rzyy"
客户端流式RPC 客户端以流式方式发送多个请求到服务器,并且服务器返回单个响应。这意味着客户端可以连续地发送多个请求给服务器,而服务器在接收到所有请求后,处理请求并返回一个响应。
在 proto文件中,添加定义一个rpc方法
1 2 3 4 5 6 service Greeter { rpc LotsOfReplies(HelloRequest)returns (stream HelloResponse) {} rpc LotsOfGreeting(stream HelloRequest)returns (HelloResponse) {} }
server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (s *server) LotsOfGreeting(stream pb.Greeter_LotsOfGreetingServer) error { reply := "你好:" for { res, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.HelloResponse{Reply: reply}) } if err != nil { return err } reply += res.GetName() } }
client:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func runLotsOfGreetings (c proto.GreeterClient) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() stream, err := c.LotsOfGreeting(ctx) if err != nil { log.Fatalf("c.LotsOfReplies failed, err: %v" , err) } names := []string {"RZZY" , "日之朝矣" , "KatoMegumi" } for _, name := range names { err := stream.Send(&proto.HelloRequest{Name: name}) if err != nil { log.Fatalf("stream.Send(%v) falied err: %v" , name, err) } } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("c.LotsOfGreetings failed: %v" , err) } log.Printf("got reply: %v" , res.GetReply()) }
执行结果:
1 2023 /05 /15 21 :46 :04 got reply: 你好:RZZY日之朝矣KatoMegumi
双向流式RPC 在双向流式RPC中,客户端和服务器都可以以流式方式同时发送和接收多个请求和响应。这意味着客户端和服务器可以建立一个双向的流通道,通过该通道交换多个请求和响应。
在proto文件中新增内容:
1 2 3 4 5 6 7 8 service Greeter { rpc LotsOfReplies(HelloRequest)returns (stream HelloResponse) {} rpc LotsOfGreeting(stream HelloRequest)returns (HelloResponse) {} rpc BindHello(stream HelloRequest)returns (HelloResponse) {} }
生成一下代码
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (s *server) LotsOfGreetingAndReplies(stream pb.Greeter_LotsOfGreetingAndRepliesServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } reply := magic(in.GetName()) if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil { return err } } } func magic (s string ) string { s = strings.ReplaceAll(s, "吗" , "" ) s = strings.ReplaceAll(s, "吧" , "" ) s = strings.ReplaceAll(s, "你" , "我" ) s = strings.ReplaceAll(s, "?" , "!" ) s = strings.ReplaceAll(s, "?" , "!" ) return s }
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func runLotsOfGreetingAndReplies (c proto.GreeterClient) { ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) defer cancelFunc() stream, err := c.LotsOfGreetingAndReplies(ctx) if err != nil { log.Fatalf("c.LotsOfGreetingAndReplies err: %v" , err.Error()) } waitc := make (chan struct {}) go func () { for { in, err := stream.Recv() if err == io.EOF { close (waitc) log.Println("断掉了" ) return } if err != nil { log.Fatalf("stream.CloseAndRecv() err:%v" , err.Error()) } log.Printf("Kelee: %s\n" , in.GetReply()) } }() reader := bufio.NewReader(os.Stdin) for { cmd, _ := reader.ReadString('\n' ) cmd = strings.TrimSpace(cmd) if len (cmd) == 0 { continue } if strings.ToUpper(cmd) == "QUIT" || strings.ToUpper(cmd) == "EXIT" { break } err := stream.Send(&proto.HelloRequest{Name: cmd}) if err != nil { log.Fatalf("stream.Send err: %v" , err.Error()) } } stream.CloseSend() <-waitc }
由客户端发送消息,然后服务端回应,像聊天一样,不过例子是一问一答