[raw]gorpc笔记
- 简单,相比于xml
- 效率,体积,速度,二进制编码
- 生成数据访问类
- 自动的序列化/反序列化
- 可以作为自描述格式,用于存储
- 作为协议文件的一部分
- 兼容性好,使用tag标记字段,协议新增字段,对于旧的服务,可以跳过不解析
proto3 语法
- message 类型命名驼峰体,字段命名下划线
- enum 类型命名驼峰体,字段命名大写下划线
- 类型
- double float bool
- int32 int64 uint32 uint64
- sint32 sint64 for negative
- fix32 fix64 for large number 2^32 2^56
- sfix32 sfix64 4/8 bytes
- string
- bytes
- repeated 数组
- singular 可选
- 嵌套类型
- package foo.bar; 包名,防止重名
- 增加对json的支持
编码方式,类似于TLV,tag = field_number + wire_type (varints)
- 0 varint [int31,int64,uint32,bool.enum..] TV
- 1/5 64-bit[fix64,double] /32-bit[fix32,float] TV
- 2 Length-delimited [string, bytes,packed repeated, embedded messages] TLV
- 编码策略:variant, zigzag
- variant利用1bit作为msb,因此可以去掉length, 对非符号,小的正数效率高,编码负数效率降低(int32,int64);
- zigzag,有符号整数映射无符号整数后使用variant(sint32,sint64)
- tag, 占用1-15,超出时新增一个字节
demo
// generate protoc --go_out=plugins=grpc:. xxx.proto
syntax = "proto3";
message SongRequest {
string song_name = 1;
}
message SongResponse {
string song_name = 1;
}
enum Foo {
FIRST_VAL = 0;
SECOND_VAL = 1;
}
service SongService {
rpc GetSong(SongRequest) returns (SongResponse)
}
// 双向流
service DoubleService {
rpc Channel(stream String) returns (stream String);
}
// 单向流
service SingleService {
rpc ListFeatures() returns (stream Features);
rpc Record(stream Point) return (string);
}
4. grpc
4.1 安装
- protoc 配置到$GOROOT/bin
- go get -u google.golang.org/grpc
- go get -u github.com/golang/protobuf/protoc-gen-go
- --go_out 指定plugins及输出目录, --proto_path指定proto文件位置
protoc --go_out=plugins=grpc:proto --proto_path=proto hello.proto pubsub.proto
4.2 demo
- 客户端 / 服务端实现
hello.pb
syntax = "proto3";package Hello;
message String {
string value =1;
}
service HelloService {
rpc Hello (String) returns (String);
}
test.go
type HelloSvcImpl struct {}
func (p *HelloSvcImpl) Hello(ctx context.Context, args *Hello.String) (*Hello.String, error) {
reply := &Hello.String{Value: "hello: " + args.GetValue()}
return reply, nil
}
func TestGRpc(t *testing.T) {
go func() {
server := grpc.NewServer()
Hello.RegisterHelloServiceServer(server, &HelloSvcImpl{})
lis, err := net.Listen("tcp", ":2234")
if err != nil {
t.Fatal("fail ", err)
}
server.Serve(lis)
}()
time.Sleep(time.Second * 3)
conn, err := grpc.Dial(":2234", grpc.WithInsecure())
if err != nil {
t.Fatal("cli fail", err)
}
defer conn.Close()
cli := Hello.NewHelloServiceClient(conn)
rep, err := cli.Hello(context.Background(), &Hello.String{Value: "grpc world"})
if err != nil {
t.Fatal("cli fail", err)
}
t.Log(rep.GetValue())
}
- grpc流
service HelloService { rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
}
// 服务实现func (p *HelloSvcImpl) Channel(stream Hello.HelloService_ChannelServer) error {
for{
args, err := stream.Recv()
if err !=nil{
if err == io.EOF{
return nil
}
return err
}
reply := &Hello.String{Value:"hello: " + args.GetValue()}
err = stream.Send(reply)
if err !=nil{
return err
}
}
}
func TestGpcStream(t *testing.T){
// server
go func() {
server := grpc.NewServer()
Hello.RegisterHelloServiceServer(server, &HelloSvcImpl{})
lis, err := net.Listen("tcp", ":2234")
if err != nil {
t.Fatal("fail ", err)
}
server.Serve(lis)
}()
time.Sleep(time.Second * 3)
// client
conn, err := grpc.Dial(":2234", grpc.WithInsecure())
if err != nil {
t.Fatal("cli fail", err)
}
defer conn.Close()
cli := Hello.NewHelloServiceClient(conn)
stream, err := cli.Channel(context.Background())
if err != nil {
t.Fatal("cli fail", err)
}
//send
go func() {
for count:=0;;count++{
if err :=stream.Send(&Hello.String{
Value:fmt.Sprintf("count %d",count)});err !=nil{
t.Fatal("fail send",err)
}
time.Sleep(time.Second)
}
}()
//recv
ch :=make(chan Hello.String,2)
go func(ch chan <- Hello.String){
for{
reply,err := stream.Recv()
if err !=nil{
if err == io.EOF {
ch <-Hello.String{Value:"Done"}
break
}
t.Fatal("fail recv",err)
}
ch <- *reply
}
}(ch)
for count:=0;count !=10;count++{
msg :=<-ch
t.Log("recv: ",msg.GetValue())
}
}
- 发布/订阅服务
使用docker项目提供的pubsub包实现本地发布订阅
import "hello.proto";service PubSubService{
rpc Publish (String) returns (String);
rpc Subscribe (String) returns (stream String);
}
pubsub.go 发布订阅的服务实现
type PubSubSvc struct { pub *pubsub.Publisher
}
func NewPubSubService() *PubSubSvc{
return &PubSubSvc{
pub:pubsub.NewPublisher(100*time.Millisecond,10),
}
}
func (p *PubSubSvc) Publish(ctx context.Context,
args *Hello.String)(*Hello.String, error){
p.pub.Publish(args.GetValue())
return &Hello.String{},nil
}
func (p *PubSubSvc) Subscribe(args *Hello.String, stream Hello.PubSubService_SubscribeServer) error{
// 注册一个过滤器函数,过滤 topic
ch := p.pub.SubscribeTopic(func(v interface{}) bool{
if k, ok := v.(string);ok{
if strings.HasPrefix(k, args.GetValue()){
return true
}
}
return false
})
for v:=range ch{
if err :=stream.Send(&Hello.String{Value:v.(string)});err !=nil{
return err
}
}
return nil
}
func TestPubSub(t *testing.T){ //server
go func() {
server := grpc.NewServer()
svc := NewPubSubService()
Hello.RegisterPubSubServiceServer(server, svc)
lis, err := net.Listen("tcp", ":2234")
if err != nil {
t.Fatal("fail ", err)
}
server.Serve(lis)
}()
time.Sleep(time.Second)
//publisher
go func(){
conn,err := grpc.Dial(":2234",grpc.WithInsecure())
if err !=nil{
t.Fatal(err)
}
defer conn.Close()
cli := Hello.NewPubSubServiceClient(conn)
for i :=0;i!=3;i++{
_,err := cli.Publish(
context.Background(), &Hello.String{Value:fmt.Sprintf("golang %d",i)})
if err !=nil{
t.Fatal(err)
}
_,err = cli.Publish(
context.Background(), &Hello.String{Value:fmt.Sprintf("docker %d",i)})
if err !=nil{
t.Fatal(err)
}
}
}()
// subscriber
go func() {
conn, err := grpc.Dial(":2234",grpc.WithInsecure())
if err !=nil{
t.Fatal(err)
}
defer conn.Close()
cli := Hello.NewPubSubServiceClient(conn)
// 只订阅主题为golang
stream,err := cli.Subscribe(context.Background(), &Hello.String{Value:"golang"})
if err !=nil{
t.Fatal(err)
}
for {
reply, err := stream.Recv()
if err !=nil{
if err == io.EOF{
break
}
t.Fatal(err)
}
t.Log(reply.GetValue())
}
}()
time.Sleep(time.Second*15)
}
参考
- Go语言高级编程
- csdn 博客
以上是 [raw]gorpc笔记 的全部内容, 来源链接: utcz.com/z/513411.html