在微服务体系中,任何一个服务都可以使用多个实例进行部署,那么对于一个请求发送到哪个实例来进行处理,需要一个策略来选择处理节点。
Selector是基于注册中心registry来进行节点选择和状态标记的。在选择过程中可以使用不同的算法进行选择。
Selector接口定义为
//Selectorbuildsontheregistryasamechanismtopicknodes//andmarktheirstatus.Thisallowshostpoolsandotherthings//tobebuiltusingvariousalgorithms.typeSelectorinterface{Init(opts...Option)errorOptions()Options//Select返回一个函数,返回值函数应该返回下一个节点Select(servicestring,opts...SelectOption)(Next,error)//Marksetsthesuccess/erroragainstanodeMark(servicestring,node*registry.Node,errerror)//ResetreturnsstatebacktozeroforaserviceReset(servicestring)//CloserenderstheselectorunusableClose()error//NameoftheselectorString()string}
如果需要不同的选择策略, 可以自定义选择策略
//Nextisafunctionthatreturnsthenextnode//basedontheselector'sstrategytypeNextfunc()(*registry.Node,error)//FilterisusedtofilteraserviceduringtheselectionprocesstypeFilterfunc([]*registry.Service)[]*registry.Service//Strategyisaselectionstrategye.grandom,roundrobintypeStrategyfunc([]*registry.Service)Next
selector默认的实现
funcNewSelector(opts...Option)Selector{sopts:=Options{Strategy:Random,}for_,opt:=rangeopts{opt(&sopts)}ifsopts.Registry==nil{sopts.Registry=registry.DefaultRegistry}s:=®istrySelector{so:sopts,}s.rc=s.newCache()returns}
在默认的实现中,节点选择策略为随机选择
funcinit(){rand.Seed(time.Now().UnixNano())}//RandomisarandomstrategyalgorithmfornodeselectionfuncRandom(services[]*registry.Service)Next{nodes:=make([]*registry.Node,0,len(services))for_,service:=rangeservices{nodes=append(nodes,service.Nodes...)}returnfunc()(*registry.Node,error){iflen(nodes)==0{returnnil,ErrNoneAvailable}i:=rand.Int()%len(nodes)returnnodes[i],nil}}
节点选择流程如下:
根据指定的服务名,获取服务列表
根据服务选项中的过滤器,进行过滤
如果还存在多个节点,那么根据选择策略,选择一个节点返回
func(c*registrySelector)Select(servicestring,opts...SelectOption)(Next,error){sopts:=SelectOptions{Strategy:c.so.Strategy,}for_,opt:=rangeopts{opt(&sopts)}//gettheservice//trythecachefirst//ifthatfailsgodirectlytotheregistryservices,err:=c.rc.GetService(service)iferr!=nil{iferr==registry.ErrNotFound{returnnil,ErrNotFound}returnnil,err}//applythefiltersfor_,filter:=rangesopts.Filters{services=filter(services)}//ifthere'snothingleft,returniflen(services)==0{returnnil,ErrNoneAvailable}returnsopts.Strategy(services),nil}
从examples中介绍的客户端负载案例中,我们可以看看如何去使用
funcmain(){cmd.Init()client.DefaultClient=client.NewClient(client.Selector(FirstNodeSelector()),)fmt.Println("\n---Callexample---")fori:=0;i<10;i++{call(i)}}
FirstNodeSelector的返回值是一个firstNodeSelector
负载器,他是Selector接口的实现者,逻辑非常简单,实现的效果就是永远选择服务列表中的第一个
//BuiltinrandomhashednodeselectortypefirstNodeSelectorstruct{optsselector.Options}func(n*firstNodeSelector)Init(opts...selector.Option)error{for_,o:=rangeopts{o(&n.opts)}returnnil}func(n*firstNodeSelector)Options()selector.Options{returnn.opts}func(n*firstNodeSelector)Select(servicestring,opts...selector.SelectOption)(selector.Next,error){services,err:=n.opts.Registry.GetService(service)iferr!=nil{returnnil,err}iflen(services)==0{returnnil,selector.ErrNotFound}varsoptsselector.SelectOptionsfor_,opt:=rangeopts{opt(&sopts)}for_,filter:=rangesopts.Filters{services=filter(services)}iflen(services)==0{returnnil,selector.ErrNotFound}iflen(services[0].Nodes)==0{returnnil,selector.ErrNotFound}returnfunc()(*registry.Node,error){returnservices[0].Nodes[0],nil},nil}func(n*firstNodeSelector)Mark(servicestring,node*registry.Node,errerror){return}func(n*firstNodeSelector)Reset(servicestring){return}func(n*firstNodeSelector)Close()error{returnnil}func(n*firstNodeSelector)String()string{return"first"}
继续看client的代码, 在实例化client的时候,初始化我们的选择器,这也体现了,只要我们是实现了Selector接口,自己定义选择器实现,也是没问题的。
client.DefaultClient=client.NewClient(client.Selector(FirstNodeSelector()),)
在服务调用时,从下面的client.Call方法实现,可以看看整个流程是如何运作的
funccall(iint){//Createnewrequesttoservicego.micro.srv.example,methodExample.Callreq:=client.NewRequest("go.micro.srv.example","Example.Call",&example.Request{Name:"John",})rsp:=&example.Response{}//Callserviceiferr:=client.Call(context.Background(),req,rsp);err!=nil{fmt.Println("callerr:",err,rsp)return}fmt.Println("Call:",i,"rsp:",rsp.Msg)}
如果你使用的grpc,
func(g*grpcClient)Call(ctxcontext.Context,reqclient.Request,rspinterface{},opts...client.CallOption)error{ifreq==nil{returnerrors.InternalServerError("go.micro.client","reqisnil")}elseifrsp==nil{returnerrors.InternalServerError("go.micro.client","rspisnil")}//makeacopyofcalloptscallOpts:=g.opts.CallOptionsfor_,opt:=rangeopts{opt(&callOpts)}next,err:=g.next(req,callOpts)iferr!=nil{returnerr}//checkifwealreadyhaveadeadlined,ok:=ctx.Deadline()if!ok{//nodeadlinesowecreateanewonevarcancelcontext.CancelFuncctx,cancel=context.WithTimeout(ctx,callOpts.RequestTimeout)defercancel()}else{//gotadeadlinesononeedtosetupcontext//butweneedtosetthetimeoutwepassalongopt:=client.WithRequestTimeout(time.Until(d))opt(&callOpts)}//shouldwenooprighthere?select{case<-ctx.Done():returnerrors.New("go.micro.client",fmt.Sprintf("%v",ctx.Err()),408)default:}//makecopyofcallmethodgcall:=g.call//wrapthecallinreversefori:=len(callOpts.CallWrappers);i>0;i--{gcall=callOpts.CallWrappers[i-1](gcall)}//returnerrors.New("go.micro.client","requesttimeout",408)call:=func(iint)error{//callbackofffirst.Someonemaywantaninitialstartdelayt,err:=callOpts.Backoff(ctx,req,i)iferr!=nil{returnerrors.InternalServerError("go.micro.client",err.Error())}//onlysleepifgreaterthan0ift.Seconds()>0{time.Sleep(t)}//selectnextnodenode,err:=next()service:=req.Service()iferr!=nil{iferr==selector.ErrNotFound{returnerrors.InternalServerError("go.micro.client","service%s:%s",service,err.Error())}returnerrors.InternalServerError("go.micro.client","errorselecting%snode:%s",service,err.Error())}//makethecallerr=gcall(ctx,node,req,rsp,callOpts)g.opts.Selector.Mark(service,node,err)ifverr,ok:=err.(*errors.Error);ok{returnverr}returnerr}ch:=make(chanerror,callOpts.Retries+1)vargerrerrorfori:=0;i<=callOpts.Retries;i++{gofunc(iint){ch<-call(i)}(i)select{case<-ctx.Done():returnerrors.New("go.micro.client",fmt.Sprintf("%v",ctx.Err()),408)caseerr:=<-ch://ifthecallsucceededletsbailearlyiferr==nil{returnnil}retry,rerr:=callOpts.Retry(ctx,req,i,err)ifrerr!=nil{returnrerr}if!retry{returnerr}gerr=err}}returngerr}
他会调用next方法,来获取节点
//Nextisafunctionthatreturnsthenextnode//basedontheselector'sstrategytypeNextfunc()(*registry.Node,error)//FilterisusedtofilteraserviceduringtheselectionprocesstypeFilterfunc([]*registry.Service)[]*registry.Service//Strategyisaselectionstrategye.grandom,roundrobintypeStrategyfunc([]*registry.Service)Next0
在真正的调用之前,会进行节点选择
//Nextisafunctionthatreturnsthenextnode//basedontheselector'sstrategytypeNextfunc()(*registry.Node,error)//FilterisusedtofilteraserviceduringtheselectionprocesstypeFilterfunc([]*registry.Service)[]*registry.Service//Strategyisaselectionstrategye.grandom,roundrobintypeStrategyfunc([]*registry.Service)Next1
以上就是节点选择器的大致内容,如果想要了解更详细的内容,可以去这里go-micro看更详细的代码。