GRPC学习
一、GRPC 安装和hello world
1、什么是GRPC
GRPC是谷歌开源的一个可以跨语言RPC的一个框架。可以实现跨语言远程调用
2、安装grpc和代码
快速开始(官网):https://grpc.io/docs/languages/go/quickstart/
go get google.golang.org/grpc
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./hello_grpc.proto
option go_package="./;golang";
案例
hello_grpc.proto
syntax = "proto3";
option go_package="./;golang";
package hello_grpc;
message Req {
string message=1;
}
message Res {
string message=1;
}
service HelloGRPC {
rpc SayHi(Req) returns (Res);
}
成功
3、服务端
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_server/main.go
3.1、取出 server
type server struct {
hello_grpc.UnimplementedHelloGRPCServer
}
3.2、挂载方法
func (s *server) SayHi(ctx context.Context,req *hello_grpc.Req) (res *hello_grpc.Res, err error) {
fmt.Println(req.GetMessage())
return &hello_grpc.Res{Message: "我是从服务端返回的grpc的内容"},nil
}
3.3、注册服务
l,_ := net.Listen("tcp",":8888")
s:=grpc.NewServer()
hello_grpc.RegisterHelloGRPCServer(s,&server{})
3.4、创建监听
s.Serve(l)
案例
import (
"context"
"fmt"
hello_grpc "goclass/grpc_class/pb"
"google.golang.org/grpc"
"net"
)
//1.取出server
type server struct {
hello_grpc.UnimplementedHelloGRPCServer
}
//复写掉了 server接口里面的SayHi方法
func (s *server) SayHi(ctx context.Context, req *hello_grpc.Req) (res *hello_grpc.Res, err error) {
fmt.Println(req.GetMessage())
return &hello_grpc.Res{Message: "我是从服务端返回的grpc内容"}, nil
}
func main() {
l, e := net.Listen("tcp", ":8888")
fmt.Println(e)
s := grpc.NewServer()
hello_grpc.RegisterHelloGRPCServer(s, &server{})
s.Serve(l)
}
4、客户端
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_client/main.go
- 创建一个链接
- new 一个 client
- 调用clint的方法
- 获取返回值
案例
package main
import (
"context"
"fmt"
hello_grpc "goclass/grpc_class/pb"
"google.golang.org/grpc"
)
func main() {
conn, e := grpc.Dial("localhost:8888", grpc.WithInsecure())
fmt.Println(e)
defer conn.Close()
client := hello_grpc.NewHelloGRPCClient(conn)
req, _ := client.SayHi(context.Background(), &hello_grpc.Req{Message: "我从客户端来"})
fmt.Println(req.GetMessage())
}
二、protobuf语法
0、初始化
控制台
go mod init github/pixel
1、syntax 声明语法版本
syntax = "proto3"
2、package 给当前的proto分配包名称
2.1、option
option go_package="包路径(从mod下开始写);别名"
3、message 定义结构
message 名称(大写开头) {
类型 key(下划线) = 唯一(标识)
}
message Req{
string user_name = 1;
}
4、类型 声明参数类型
https://developers.google.com/protocol-buffers/docs/proto3#scalar
5、切片(数组)
需要添加关键字 repeated
6、map
map<key 类型,value 类型> key = 标识;
7、类型嵌套
message Req{
message Person{
string name = 1;
}
int32 id = 1;
Person person = 2;
}
案例
person.proto
syntax = "proto3"; //告诉编译器 用proto3 来解读
package person;
option go_package="github/pixel/pb/person;person";
message Home {
repeated Person persons=1;
message V {
string name = 1;
}
V v = 2;
}
message Person{
string name = 1;
int32 age = 2;
bool sex = 3;
repeated string test = 4;
map <string,string> test_map = 5;
}
build.bat
https://grpc.io/docs/languages/go/quickstart/
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./person/person.proto
8、注释
9、字段作废
message Req{
string user_name = 1;
string one = 2;
string two = 3;
string three = 4;
string four = 5;
string five = 6;
reserved 2,4 to 6;
reserved "one","five"; //one,five无法使用(类似于保留字)
}
10、枚举类型
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;
}
//一定要存在一个枚举常量值为 0
options allow_alias = true;
当你的枚举里面存在相同常量值的时候 必须设置这个选项
11、oneof
我们可以再message里用 oneof 来包裹某个 段落
==在这个作用区域中 所有的值只要被设置过了其他的值会被清空==
它不可以被 repeated (不能是数组)
12、
导入定义
import "grpc_class/hello_grpc/test.proto";
java语言不可用 因此 我们就不要用 grpc为了跨语言而定 所以 只要是不通用的 尽量不要用
13、定义服务
service SearchService {
rpc Search(SearchRequest) returns (SearchResponse);
}
service SearchService {
rpc Search(stream SearchRequest) returns (SearchResponse);
}
service SearchService {
rpc Search(SearchRequest) returns (stream SearchResponse);
}
service SearchService {
rpc Search(stream SearchRequest) returns (stream SearchResponse);
}
三、GRPC server 的使用
1、普通服务
rpc SayHiCommon(Req)returns (Res){};
服务端
-
func (s *service) SayHiCommon(ctx context.Context, req *hello_grpc.Req) (res *hello_grpc.Res,err error) { fmt.Println(req.GetName()) return }
-
func(grpc 服务结构体)方法名(ctx,入参指针)(回参指针,错误){ }
客户端
-
req,_:= client.SayHiCommon(context.Background(),&hello_grpc.Req{Name: "普通传入"}) fmt.Println(req)
-
返回,错误:=创建好的客户端连接.方法名(ctx,入参)
案例
person.proto
syntax = "proto3"; //告诉编译器 用proto3 来解读
package person;
option go_package="github/pixel/pb/person;person";
message PersonReq{
string name = 1;
int32 age = 2;
}
message PersonRes{
string name = 1;
int32 age = 2;
}
service SearchService {
rpc Search(PersonReq) returns (PersonRes);
rpc SearchIn(stream PersonReq) returns (PersonRes);
rpc SearchOut(PersonReq) returns (stream PersonRes);
rpc SearchIO(stream PersonReq) returns (stream PersonRes);
}
client
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
)
func main() {
l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
client := person.NewSearchServiceClient(l)
res, err := client.Search(context.Background(), &person.PersonReq{Name: "我是奇妙"})
if err != nil {
fmt.Println(err)
}
fmt.Println(res) //name:"我收到了我是奇妙的信息"
}
server
package main
import (
"context"
"github/pixel/pb/person"
"google.golang.org/grpc"
"net"
)
type personServe struct {
person.UnimplementedSearchServiceServer
}
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
res := &person.PersonRes{Name: "我收到了" + name + "的信息"}
return res, nil
}
func (*personServe) SearchIn(context.Context, *person.PersonReq) (*person.PersonRes, error) {
return nil, nil
}
func (*personServe) SearchOut(context.Context, *person.PersonReq) (*person.PersonRes, error) {
return nil, nil
}
func (*personServe) SearchIO(context.Context, *person.PersonReq) (*person.PersonRes, error) {
return nil, nil
}
func main() {
l, _ := net.Listen("tcp", ":8888")
s := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServe{})
s.Serve(l)
}
2、流式传入(客户端流)
rpc SayHiInStream(stream Req)returns (Res){};
服务端
-
func (s *service) SayHiInStream(serve hello_grpc.HelloService_SayHiInStreamServer)(err error){ var res hello_grpc.Res res.NameRes = "success" for{ if tem,err:= serve.Recv();err!=nil{ fmt.Println("传输完了",err) serve.SendAndClose(&res) break }else{ fmt.Println(tem) } } return }
-
func(grpc 服务结构体)方法名(grpc 给定好的一个流式回参server)(错误){ serve.Recv() 进行接收 判断是否 eof 然后 可以在最后关闭之前 用 sendAndClose传输res回去 }
客户端
-
inClient,_ := client.SayHiInStream(context.Background()) i:=1 for{ inClient.Send(&hello_grpc.Req{Sex: "99",Name: "我是流"}) time.Sleep(1 * time.Second) i++ if i>10 { res, err := inClient.CloseAndRecv() fmt.Println(res.NameRes,err) break } }
-
通过client上面的grpc提前剑豪的一个流方法 船舰一个 client
然后对流进行写入操作 在准备关闭的时候 调用 closeAndRecv()关闭并且拿到服务端返回的信息
client
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
"time"
)
func main() {
l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
client := person.NewSearchServiceClient(l)
c, _ := client.SearchIn(context.Background())
i := 0
for {
if i > 10 {
res, _ := c.CloseAndRecv()
fmt.Println(res)
break
}
time.Sleep(1 * time.Second)
c.Send(&person.PersonReq{Name: "我是进来的信息"})
i++
}
}
name:"完成了"
server
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
"net"
)
type personServe struct {
person.UnimplementedSearchServiceServer
}
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
res := &person.PersonRes{Name: "我收到了" + name + "的信息"}
return res, nil
}
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {
for {
req, err := server.Recv()
fmt.Println(req)
if err != nil {
server.SendAndClose(&person.PersonRes{Name: "完成了"})
//fmt.Println(err)
break
}
}
return nil
}
func (*personServe) SearchOut(*person.PersonReq, person.SearchService_SearchOutServer) error {
return nil
}
func (*personServe) SearchIO(person.SearchService_SearchIOServer) error {
return nil
}
func main() {
l, _ := net.Listen("tcp", ":8888")
s := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServe{})
s.Serve(l)
}
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
name:"我是进来的信息"
<nil>
3、流式返回(服务端流)
rpc SayHiOutStream(Req)returns (stream Res){};
服务端
- func (s 结构体) proto定义的方法名(请求结构,grpc帮我们做好的流式serve) (err error)
- 通过 serve.Send流式发送数据即可
客户端
-
outClient,_ := client.SayHiOutStream(context.Background(),&hello_grpc.Req{Name: "我来拿流了"}) for{ req,err := outClient.Recv() if err!=nil{ fmt.Println(err) outClient.CloseSend() break }else{ fmt.Println(req) } }
-
通过grpc给我们定义好的方法 传入 ctx和入参 然后开始流式接收(grpc给的一个 client) outClient.Recv() 不断读取 等待一个 eof CloseSend()方法可以强制断开来源 但是不安全 issue 查看一下 官方告知 serve只要跳出循环 就可以中断链接
server
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
"net"
"time"
)
type personServe struct {
person.UnimplementedSearchServiceServer
}
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
res := &person.PersonRes{Name: "我收到了" + name + "的信息"}
return res, nil
}
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {
for {
req, err := server.Recv()
fmt.Println(req)
if err != nil {
server.SendAndClose(&person.PersonRes{Name: "完成了"})
//fmt.Println(err)
break
}
}
return nil
}
func (*personServe) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error {
name := req.Name
i := 0
for {
if i > 10 {
break
}
time.Sleep(1 * time.Second)
server.Send(&person.PersonRes{Name: "我拿到了" + name})
i++
}
return nil
}
func (*personServe) SearchIO(person.SearchService_SearchIOServer) error {
return nil
}
func main() {
l, _ := net.Listen("tcp", ":8888")
s := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServe{})
s.Serve(l)
}
client
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
)
func main() {
l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
client := person.NewSearchServiceClient(l)
c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "奇妙"})
for {
req, err := c.Recv()
if err != nil {
fmt.Println(req)
break
}
fmt.Println(req)
}
}
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
name:"我拿到了奇妙"
<nil>
4、流式出入(双向流)
rpc SayHiIOStream(stream Req)returns (stream Res){};
服务端
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
"net"
"time"
)
type personServe struct {
person.UnimplementedSearchServiceServer
}
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
res := &person.PersonRes{Name: "我收到了" + name + "的信息"}
return res, nil
}
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {
for {
req, err := server.Recv()
fmt.Println(req)
if err != nil {
server.SendAndClose(&person.PersonRes{Name: "完成了"})
//fmt.Println(err)
break
}
}
return nil
}
func (*personServe) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error {
name := req.Name
i := 0
for {
if i > 10 {
break
}
time.Sleep(1 * time.Second)
server.Send(&person.PersonRes{Name: "我拿到了" + name})
i++
}
return nil
}
func (*personServe) SearchIO(server person.SearchService_SearchIOServer) error {
i := 0
str := make(chan string)
go func() {
for {
i++
req, _ := server.Recv()
if i > 10 {
str <- "结束"
break
}
str <- req.Name
}
}()
for {
s := <-str
if s == "结束" {
server.Send(&person.PersonRes{Name: s})
break
}
server.Send(&person.PersonRes{Name: s})
}
return nil
}
func main() {
l, _ := net.Listen("tcp", ":8888")
s := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServe{})
s.Serve(l)
}
客户端
package main
import (
"context"
"fmt"
"github/pixel/pb/person"
"google.golang.org/grpc"
"sync"
"time"
)
func main() {
l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
client := person.NewSearchServiceClient(l)
c, _ := client.SearchIO(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for {
time.Sleep(1 * time.Second)
err := c.Send(&person.PersonReq{Name: "奇妙"})
if err != nil {
wg.Done()
break
}
}
}()
go func() {
for {
req, err := c.Recv()
if err != nil {
fmt.Println(err)
wg.Done()
break
}
fmt.Println(req)
}
wg.Done()
}()
wg.Wait()
}
四、GRPC-gateway 简单使用
文档:Introduction to the gRPC-Gateway | gRPC-Gateway (grpc-ecosystem.github.io)
-
给grpc生成的文件附加一个http1.1的restful供外界访问
-
安装
go get github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway
-
两个依赖包
-
https://github.com/googleapis/googleapis/blob/master/google/api/annotations.proto
-
https://github.com/googleapis/googleapis/blob/master/google/api/http.proto
-
-
生成语句变更 添加 一句gateway生成方式
-
--grpc-gateway_out . --grpc-gateway_opt paths=source_relative
-
//完整: protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out . --grpc-gateway_opt paths=source_relative ./hello_grpc/hello_grpc.proto
-
-
proto变更
-
//server增加option option (google.api.http) = { post: "/api/hello" body: "*" };
-
-
书写网关代码
-
//创建一个客户端 conn, _ := grpc.DialContext( context.Background(), "127.0.0.1:8888", grpc.WithBlock(), grpc.WithInsecure(), )
-
创建一个mux mux := runtime.NewServeMux()
-
//创建http服务 gwServer := &http.Server{ Addr: ":8090", Handler: gwmux, }
-
//注册网关handle _ = hello_grpc.RegisterHelloServiceHandler(context.Background(),gwmux,conn)
-
//监听网关 gwServer.ListenAndServe()
-
评论区