首页>>后端>>Golang->Consul 入门

Consul 入门

时间:2023-11-30 本站 点击:0

前言

假如我有钱,我想买一个降噪耳机,我应该哪里买? 答案很简单,可以去京东或者线下实体店。 那如果把这个问题映射到微服务架构中:我打开京东,选中某款耳机进入详情页浏览,我可以看到这款耳机的价格、库存、规格、评价等。以我的理解,这个链路应该是这样的:

暂定这个系统由3个微服务组成:商品详情服务、库存服务、评价服务。

商品详情服务:聚合端上用户看到的所有信息

库存服务:维护商品的库存信息、规格信息、价格信息

评价服务:维护用户对商品的评价

微服务的目的是为了基于松耦合高内聚将单体服务进行拆分,然后将个服务进行多副本部署(我们甚至不知道它会被部署到哪里,实体机?虚拟机?容器?云上?)以达到高可用的目的。这也要付出点代价,商品详情服务需要知道:库存服务和评价服务在哪里?

由此,我们将继续学习 Consul 这款不错的服务发现工具,通前面的学习,我们已经对 Consul 的原理、使用、搭建有了认知。本次将学习:Consul 如何在 gRPC 构建的微服务网络环境中做一名合格的“指路人”。

编写一个 Go gRPC 服务

gRPC 是由 Google 开发并开源的RPC框架,详见官网。我们将通过官网的指导来编写一个简单的 go gRPC 服务

获取样例代码

克隆grpc-go仓库

$gitclone-bv1.29.1.0https://github.com/grpc/grpc-go

切换到样例代码目录

$cdcdgrpc-go/examples/helloworld

目录结构如下:

├──greeter_client│└──main.go├──helloword│└──helloword.proto└──greeter_server|└──main.go

运行样例代码

编译执行 server 代码:

$gorungreeter_server/main.go

在新开一个终端,编译执行 client 代码,可以看到输出:

$gorungreeter_client/main.go2021/09/1116:28:29Greeting:Helloworld

gRPC 的 Banlancer

greeter_client/main.go中,是通过指定 server 地址的方式来实现访问到目标服务的

...const(address="localhost:50051")funcmain(){conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithBlock())...}

但这种方式在生产环境是不可行的,因为我们并不知道目标服务的地址(目标服务的地址也有可能不只一个)。实际上,gRPC 已经为我们提供来解决方案:Balancer。

首先,看一下 gRPC 客户端负载均衡实现的官方架构图:

从图中,可以看到 Balancer 均衡器位于架构的最右方,内置一个 Picker 模块,Balancer 主要完成下面几个功能:

与 Rersovler 通信(维持通信机制),接收 Resovler 通知的服务端列表更新,维护 Connection Pool 及每个连接的状态

对上一步获取的服务端列表,调用newSubConn异步建立长连接(每个 Backend 一个长连接),同时,监控连接的状态,及时更新 Connection Pool

创建 Picker,Picker 执行的算法就是真正的 LB 逻辑,当客户端使用conn初始化 PRC 方法时,通过 Picker 选择一个存活的连接,返回给客户端,然后调用 UpdatePicker 更新 LB 算法的内置状态,为下一次调用做准备

Balaner 是 gRPC 负载均衡最核心的模块

据此,我们可用通过自定义的 Balancer,在 Balaner 基础上通过实现自定义的naming.Resolver来达到使用 Consul 看发现服务的功能。

大概流程是:

grpc 在 Dial 的时候通过 WithBalancer 传入 Balancer

Balaner 会通过 naming.Resolver 去解析 (Resovle)Dial 传入的 target 得到一个nameing.Watcher

naming.Watcher持续监视 target 解析到地址列表的变更并通过 Next 返回给 Balancer

实现 Consul Resolver

grpc-go/naming/naming.go中可以看到Resolver接口的声明

typeResolverinterface{//ResolvecreatesaWatcherfortarget.Resolve(targetstring)(Watcher,error)}

需要实现一个Consul Resolver,在里面返回可用的服务端地址列表,在examples目录下新建grpcresolver文件夹,在该文件夹下新建consul.go文件:

packagegrpcresolverimport("fmt""net""strconv""sync""sync/atomic""github.com/hashicorp/consul/api""google.golang.org/grpc/naming")typewatchEntrystruct{addrstringmodiuint64lastuint64}typeconsulWatcherstruct{downint32c*api.Clientservicestringmusync.Mutexwatchedmap[string]*watchEntrylastIndexuint64}func(w*consulWatcher)Close(){atomic.StoreInt32(&w.down,1)}func(w*consulWatcher)Next()([]*naming.Update,error){w.mu.Lock()deferw.mu.Unlock()watched:=w.watchedlastIndex:=w.lastIndexretry://访问Consul,获取可用的服务列表services,meta,err:=w.c.Catalog().Service(w.service,"",&api.QueryOptions{WaitIndex:lastIndex,})iferr!=nil{returnnil,err}iflastIndex==meta.LastIndex{ifatomic.LoadInt32(&w.down)!=0{returnnil,nil}gotoretry}lastIndex=meta.LastIndexvarupdating[]*naming.Updatefor_,s:=rangeservices{ws:=watched[s.ServiceID]fmt.Println(s.ServiceAddress,s.ServicePort)ifws==nil{//如果是新注册的服务ws=&watchEntry{addr:net.JoinHostPort(s.ServiceAddress,strconv.Itoa(s.ServicePort)),modi:s.ModifyIndex,}watched[s.ServiceID]=wsupdating=append(updating,&naming.Update{Op:naming.Add,Addr:ws.addr,})}elseifws.modi!=s.ModifyIndex{//如果是原来的服务updating=append(updating,&naming.Update{Op:naming.Delete,Addr:ws.addr,})ws.addr=net.JoinHostPort(s.ServiceAddress,strconv.Itoa(s.ServicePort))ws.modi=s.ModifyIndexupdating=append(updating,&naming.Update{Op:naming.Add,Addr:ws.addr,})}ws.last=lastIndex}forid,ws:=rangewatched{ifws.last!=lastIndex{delete(watched,id)updating=append(updating,&naming.Update{Op:naming.Delete,Addr:ws.addr,})}}w.watched=watchedw.lastIndex=lastIndexreturnupdating,nil}typeconsulResolverapi.Clientfunc(r*consulResolver)Resolve(targetstring)(naming.Watcher,error){return&consulWatcher{c:(*api.Client)(r),service:target,watched:make(map[string]*watchEntry),},nil}funcForConsul(reg*api.Client)naming.Resolver{return(*consulResolver)(reg)}

server 端通过 Consul 注册服务

修改examples/helloword/greeter_server/main.go,在启动服务前,将服务的信息注册到 Consul

packagemainimport("context""encoding/hex""flag""fmt""log""math/rand""net""strconv""time""github.com/hashicorp/consul/api""google.golang.org/grpc"pb"google.golang.org/grpc/examples/helloworld/helloworld")const(//host="192.168.10.102"//port=50051ttl=30*time.Second)//serverisusedtoimplementhelloworld.GreeterServer.typeserverstruct{pb.UnimplementedGreeterServerportint}//SayHelloimplementshelloworld.GreeterServerfunc(s*server)SayHello(ctxcontext.Context,in*pb.HelloRequest)(*pb.HelloReply,error){log.Printf("Received:%v",in.GetName())return&pb.HelloReply{Message:fmt.Sprintf("Hello%s,from%d",in.GetName(),s.port)},nil}funcmain(){host:=flag.String("h","127.0.0.1","host")port:=flag.Int("p",50051,"port")flag.Parse()lis,err:=net.Listen("tcp",net.JoinHostPort(*host,strconv.Itoa(*port)))iferr!=nil{log.Fatalf("failedtolisten:%v",err)}//ConsulClientregistry,err:=api.NewClient(api.DefaultConfig())iferr!=nil{log.Fatalln(err)}varh[16]byterand.Read(h[:])//生成一个全局IDid:=fmt.Sprintf("helloserver-%s-%d",hex.EncodeToString(h[:]),*port)fmt.Println(id)//注册到Consul,包含地址、端口信息,以及健康检查err=registry.Agent().ServiceRegister(&api.AgentServiceRegistration{ID:id,Name:"helloserver",Port:*port,Address:*host,Check:&api.AgentServiceCheck{TTL:(ttl+time.Second).String(),Timeout:time.Minute.String(),},})iferr!=nil{log.Fatalln(err)}gofunc(){checkid:="service:"+idforrangetime.Tick(ttl){err:=registry.Agent().PassTTL(checkid,"")iferr!=nil{log.Fatalln(err)}}}()s:=grpc.NewServer()pb.RegisterGreeterServer(s,&server{port:*port})iferr:=s.Serve(lis);err!=nil{log.Fatalf("failedtoserve:%v",err)}}

client 端通过 Consul 发现服务

packagemainimport("context""log""time""github.com/hashicorp/consul/api""google.golang.org/grpc""google.golang.org/grpc/examples/grpcresolver"pb"google.golang.org/grpc/examples/helloworld/helloworld")const(address="localhost:50051"defaultName="world")funcmain(){//consulregistry,err:=api.NewClient(api.DefaultConfig())iferr!=nil{log.Fatalln(err)}//自定义LB,并使用刚才写的ConsulResolverlbrr:=grpc.RoundRobin(grpcresolver.ForConsul(registry))//Setupaconnectiontotheserver.conn,err:=grpc.Dial("helloserver",grpc.WithInsecure(),grpc.WithBalancer(lbrr))iferr!=nil{log.Fatalf("didnotconnect:%v",err)}deferconn.Close()c:=pb.NewGreeterClient(conn)//调用server端RPC,通过响应观察负载均衡forrangetime.Tick(time.Second){name:=defaultNamer,err:=c.SayHello(context.Background(),&pb.HelloRequest{Name:name})iferr!=nil{log.Fatalf("couldnotgreet:%v",err)continue}log.Printf("serverreply:%s",r.GetMessage())}}

启动 & 查看

启动两个 Server,设置不同的启动端口

$cdcdgrpc-go/examples/helloworld0
$cdcdgrpc-go/examples/helloworld1

通过 Consul Web UI 查看,两个 instance 均是健康的

启动 Client

$cdcdgrpc-go/examples/helloworld2

可以看到是均匀对两个 server 发起调用,当我们将其中一个 instance server2 关掉(模拟不可用的情况),流量全面全部转移到另一台上了

说明失败转移也是正常的。

作者:Zioyi


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/4541.html