1.前言
之前自己写了一些关于Zookeeper的基础知识,Zookeeper作为一种协调分布式应用高性能的调度服务,实际的应用场景也非常的广泛,这里主要通过几个例子来具体的说明Zookeeper在特定场景下的使用方式(下面的这些功能估计consul和etcd也能实现,以后学到了再说吧)。
2.具体应用
2.1.一致性配置管理
我们在开发的时候,有时候需要获取一些公共的配置,比如数据库连接信息等,并且偶然可能需要更新配置。如果我们的服务器有N多台的话,那修改起来会特别的麻烦,并且还需要重新启动。这里Zookeeper就可以很方便的实现类似的功能。
2.1.1.思路
将公共的配置存放在Zookeeper的节点中
应用程序可以连接到Zookeeper中并对Zookeeper中配置节点进行读取或者修改(对于写操作可以进行权限验证设置),下面是具体的流程图:
2.1.2.事例
数据库配置信息一致性的维护
配置类:
publicclassCommonConfigimplementsSerializable{//数据库连接配置privateStringdbUrl;privateStringusername;privateStringpassword;privateStringdriverClass;publicCommonConfig(){}publicCommonConfig(StringdbUrl,Stringusername,Stringpassword,StringdriverClass){super();this.dbUrl=dbUrl;this.username=username;this.password=password;this.driverClass=driverClass;}publicStringgetDbUrl(){returndbUrl;}publicvoidsetDbUrl(StringdbUrl){this.dbUrl=dbUrl;}publicStringgetUsername(){returnusername;}publicvoidsetUsername(Stringusername){this.username=username;}publicStringgetPassword(){returnpassword;}publicvoidsetPassword(Stringpassword){this.password=password;}publicStringgetDriverClass(){returndriverClass;}publicvoidsetDriverClass(StringdriverClass){this.driverClass=driverClass;}@OverridepublicStringtoString(){return"CommonConfig:{dbUrl:"+this.dbUrl+",username:"+this.username+",password:"+this.password+",driverClass:"+this.driverClass+"}";}}
配置管理中心
获取本地配置信息
修改配置,并同步
同步配置信息到Zookeeper服务器
publicclassZkConfigMng{privateStringnodePath="/commConfig";privateCommonConfigcommonConfig;privateZkClientzkClient;publicCommonConfiginitConfig(CommonConfigcommonConfig){if(commonConfig==null){this.commonConfig=newCommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8","root","root","com.mysql.jdbc.Driver");}else{this.commonConfig=commonConfig;}returnthis.commonConfig;}/***更新配置**@paramcommonConfig*@return*/publicCommonConfigupdate(CommonConfigcommonConfig){if(commonConfig!=null){this.commonConfig=commonConfig;}syncConfigToZookeeper();returnthis.commonConfig;}publicvoidsyncConfigToZookeeper(){if(zkClient==null){zkClient=newZkClient("127.0.0.1:2181");}if(!zkClient.exists(nodePath)){zkClient.createPersistent(nodePath);}zkClient.writeData(nodePath,commonConfig);}}
以上是提供者,下面我们需要一个客户端获取这些配置
publicclassZkConfigClientimplementsRunnable{privateStringnodePath="/commConfig";privateCommonConfigcommonConfig;@Overridepublicvoidrun(){ZkClientzkClient=newZkClient(newZkConnection("127.0.0.1:2181",5000));while(!zkClient.exists(nodePath)){System.out.println("配置节点不存在!");try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){e.printStackTrace();}}//获取节点commonConfig=(CommonConfig)zkClient.readData(nodePath);System.out.println(commonConfig.toString());zkClient.subscribeDataChanges(nodePath,newIZkDataListener(){@OverridepublicvoidhandleDataDeleted(StringdataPath)throwsException{if(dataPath.equals(nodePath)){System.out.println("节点:"+dataPath+"被删除了!");}}@OverridepublicvoidhandleDataChange(StringdataPath,Objectdata)throwsException{if(dataPath.equals(nodePath)){System.out.println("节点:"+dataPath+",数据:"+data+"-更新");commonConfig=(CommonConfig)data;}}});}}
下面启动Main函数
配置管理服务启动
publicstaticvoidmain(String[]args)throwsInterruptedException{SpringApplication.run(ZookeeperApiDemoApplication.class,args);ZkConfigMngzkConfigMng=newZkConfigMng();zkConfigMng.initConfig(null);zkConfigMng.syncConfigToZookeeper();TimeUnit.SECONDS.sleep(10);//修改值zkConfigMng.update(newCommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8","root","wxh","com.mysql.jdbc.Driver"));}}
客户端启动:
publicstaticvoidmain(String[]args)throwsInterruptedException{SpringApplication.run(ZookeeperApiDemoApplication.class,args);ExecutorServiceexecutorService=Executors.newFixedThreadPool(3);//模拟多个客户端获取配置executorService.submit(newZkConfigClient());executorService.submit(newZkConfigClient());executorService.submit(newZkConfigClient());}}
2.2.分布式锁
在我们日常的开发中,如果是单个进程中对共享资源的访问,我们只需要用synchronized或者lock就能实现互斥操作。但是对于跨进程、跨主机、跨网络的共享资源似乎就无能为力了。
另外,分布式系列面试题和答案全部整理好了,微信搜索Java技术栈,在后台发送:面试,可以在线阅读。
2.1.1.思路
首先zookeeper中我们可以创建一个/distributed_lock
持久化节点
然后再在/distributed_lock
节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000008
获取所有的/distributed_lock
下的所有子节点,并排序
判读自己创建的节点是否最小值(第一位)
如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。
如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。
当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)
下面是具体的流程图:
2.1.3.事例
publicclassDistributedLock{//常亮staticclassConstant{privatestaticfinalintSESSION_TIMEOUT=10000;privatestaticfinalStringCONNECTION_STRING="127.0.0.1:2181";privatestaticfinalStringLOCK_NODE="/distributed_lock";privatestaticfinalStringCHILDREN_NODE="/task_";}privateZkClientzkClient;publicDistributedLock(){//连接到ZookeeperzkClient=newZkClient(newZkConnection(Constant.CONNECTION_STRING));if(!zkClient.exists(Constant.LOCK_NODE)){zkClient.create(Constant.LOCK_NODE,"分布式锁节点",CreateMode.PERSISTENT);}}publicStringgetLock(){try{//1。在Zookeeper指定节点下创建临时顺序节点StringlockName=zkClient.createEphemeralSequential(Constant.LOCK_NODE+Constant.CHILDREN_NODE,"");//尝试获取锁acquireLock(lockName);returnlockName;}catch(Exceptione){e.printStackTrace();}returnnull;}/***获取锁*@throwsInterruptedException*/publicBooleanacquireLock(StringlockName)throwsInterruptedException{//2.获取lock节点下的所有子节点List<String>childrenList=zkClient.getChildren(Constant.LOCK_NODE);//3.对子节点进行排序,获取最小值Collections.sort(childrenList,newComparator<String>(){@Overridepublicintcompare(Stringo1,Stringo2){returnInteger.parseInt(o1.split("_")[1])-Integer.parseInt(o2.split("_")[1]);}});//4.判断当前创建的节点是否在第一位intlockPostion=childrenList.indexOf(lockName.split("/")[lockName.split("/").length-1]);if(lockPostion<0){//不存在该节点thrownewZkNodeExistsException("不存在的节点:"+lockName);}elseif(lockPostion==0){//获取到锁System.out.println("获取到锁:"+lockName);returntrue;}elseif(lockPostion>0){//未获取到锁,阻塞System.out.println("......未获取到锁,阻塞等待。。。。。。");//5.如果未获取得到锁,监听当前创建的节点前一位的节点finalCountDownLatchlatch=newCountDownLatch(1);IZkDataListenerlistener=newIZkDataListener(){@OverridepublicvoidhandleDataDeleted(StringdataPath)throwsException{//6.前一个节点被删除,当不保证轮到自己System.out.println("。。。。。。前一个节点被删除。。。。。。");acquireLock(lockName);latch.countDown();}@OverridepublicvoidhandleDataChange(StringdataPath,Objectdata)throwsException{//不用理会}};try{zkClient.subscribeDataChanges(Constant.LOCK_NODE+"/"+childrenList.get(lockPostion-1),listener);latch.await();}finally{zkClient.unsubscribeDataChanges(Constant.LOCK_NODE+"/"+childrenList.get(lockPostion-1),listener);}}returnfalse;}/***释放锁(删除节点)**@paramlockName*/publicvoidreleaseLock(StringlockName){zkClient.delete(lockName);}publicvoidcloseZkClient(){zkClient.close();}}@SpringBootApplicationpublicclassZookeeperDemoApplication{publicstaticvoidmain(String[]args)throwsInterruptedException{SpringApplication.run(ZookeeperDemoApplication.class,args);DistributedLocklock=newDistributedLock();StringlockName=lock.getLock();/***执行我们的业务逻辑*/if(lockName!=null){lock.releaseLock(lockName);}lock.closeZkClient();}}
2.3.分布式队列
在日常使用中,特别是像生产者消费者模式中,经常会使用BlockingQueue来充当缓冲区的角色。但是在分布式系统中这种方式就不能使用BlockingQueue来实现了,但是Zookeeper可以实现。
2.1.1.思路
首先利用Zookeeper中临时顺序节点的特点
当生产者创建节点生产时,需要判断父节点下临时顺序子节点的个数,如果达到了上限,则阻塞等待;如果没有达到,就创建节点。
当消费者获取节点时,如果父节点中不存在临时顺序子节点,则阻塞等待;如果有子节点,则获取执行自己的业务,执行完毕后删除该节点即可。
获取时获取最小值,保证FIFO特性。
2.1.2.事例
这个是一个消费者对一个生产者,如果是多个消费者对多个生产者,对代码需要调整。
publicinterfaceAppConstant{staticStringZK_CONNECT_STR="127.0.0.1:2181";staticStringNODE_PATH="/mailbox";staticStringCHILD_NODE_PATH="/mail_";staticintMAILBOX_SIZE=10;}publicclassMailConsumerimplementsRunnable,AppConstant{privateZkClientzkClient;privateLocklock;privateConditioncondition;publicMailConsumer(){lock=newReentrantLock();condition=lock.newCondition();zkClient=newZkClient(newZkConnection(ZK_CONNECT_STR));System.out.println("sucessconnectedtozookeeperserver!");//不存在就创建mailbox节点if(!zkClient.exists(NODE_PATH)){zkClient.create(NODE_PATH,"thisismailbox",CreateMode.PERSISTENT);}}@Overridepublicvoidrun(){IZkChildListenerlistener=newIZkChildListener(){@OverridepublicvoidhandleChildChange(StringparentPath,List<String>currentChilds)throwsException{System.out.println("Znode["+parentPath+"]size:"+currentChilds.size());//还是要判断邮箱是否为空if(currentChilds.size()>0){//唤醒等待的线程try{lock.lock();condition.signal();}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}};//监视子节点的改变,不用放用while循环中,监听一次就行了,不需要重复绑定zkClient.subscribeChildChanges(NODE_PATH,listener);try{//循环随机发送邮件模拟真是情况while(true){//判断是否可以发送邮件checkMailReceive();//接受邮件List<String>mailList=zkClient.getChildren(NODE_PATH);//如果mailsize==0,也没有关系;可以直接循环获取就行了if(mailList.size()>0){Collections.sort(mailList,newComparator<String>(){@Overridepublicintcompare(Stringo1,Stringo2){returnInteger.parseInt(o1.split("_")[1])-Integer.parseInt(o2.split("_")[1]);}});//模拟邮件处理(0-1S)TimeUnit.MILLISECONDS.sleep(newRandom().nextInt(1000));zkClient.delete(NODE_PATH+"/"+mailList.get(0));System.out.println("mailhasbeenreceived:"+NODE_PATH+"/"+mailList.get(0));}}}catch(Exceptione){e.printStackTrace();}finally{zkClient.unsubscribeChildChanges(NODE_PATH,listener);}}privatevoidcheckMailReceive(){try{lock.lock();//判断邮箱是为空List<String>mailList=zkClient.getChildren(NODE_PATH);System.out.println("mailboxsize:"+mailList.size());if(mailList.size()==0){//邮箱为空,阻塞消费者,直到邮箱有邮件System.out.println("mailboxisempty,pleasewait。。。");condition.await();//checkMailReceive();}}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}publicclassMailProducerimplementsRunnable,AppConstant{privateZkClientzkClient;privateLocklock;privateConditioncondition;/***初始化状态*/publicMailProducer(){lock=newReentrantLock();condition=lock.newCondition();zkClient=newZkClient(newZkConnection(ZK_CONNECT_STR));System.out.println("sucessconnectedtozookeeperserver!");//不存在就创建mailbox节点if(!zkClient.exists(NODE_PATH)){zkClient.create(NODE_PATH,"thisismailbox",CreateMode.PERSISTENT);}}@Overridepublicvoidrun(){IZkChildListenerlistener=newIZkChildListener(){@OverridepublicvoidhandleChildChange(StringparentPath,List<String>currentChilds)throwsException{System.out.println("Znode["+parentPath+"]size:"+currentChilds.size());//还是要判断邮箱是否已满if(currentChilds.size()<MAILBOX_SIZE){//唤醒等待的线程try{lock.lock();condition.signal();}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}};//监视子节点的改变,不用放用while循环中,监听一次就行了,不需要重复绑定zkClient.subscribeChildChanges(NODE_PATH,listener);try{//循环随机发送邮件模拟真是情况while(true){//判断是否可以发送邮件checkMailSend();//发送邮件StringcretePath=zkClient.createEphemeralSequential(NODE_PATH+CHILD_NODE_PATH,"yourmail");System.out.println("yourmailhasbeensend:"+cretePath);//模拟随机间隔的发送邮件(0-10S)TimeUnit.MILLISECONDS.sleep(newRandom().nextInt(1000));}}catch(Exceptione){e.printStackTrace();}finally{zkClient.unsubscribeChildChanges(NODE_PATH,listener);}}privatevoidcheckMailSend(){try{lock.lock();//判断邮箱是否已满List<String>mailList=zkClient.getChildren(NODE_PATH);System.out.println("mailboxsize:"+mailList.size());if(mailList.size()>=MAILBOX_SIZE){//邮箱已满,阻塞生产者,直到邮箱有空间System.out.println("mailboxisfull,pleasewait。。。");condition.await();checkMailSend();}}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}
2.4.均衡负载
首先我们需要简单的理解分布式和集群,通俗点说:分布式就是将一个系统拆分到多个独立运行的应用中(有可能在同一台主机也有可能在不同的主机上),集群就是将单个独立的应用复制多分放在不同的主机上来减轻服务器的压力。
而Zookeeper不仅仅可以作为分布式集群的服务注册调度中心(例如dubbo),也可以实现集群的负载均衡。
2.4.1.思路
首先我们要理解,如果是一个集群,那么他就会有多台主机。所以,他在Zookeeper中信息的存在应该是如下所示:
如上的结构,当服务调用方调用服务时,就可以根据特定的均衡负载算法来实现对服务的调用(调用前需要监听/service/serviceXXX节点,以更新列表数据)
2.4.2.事例
/***服务提供者**@authorAdministrator**/publicclassServiceProvider{//静态常量staticStringZK_CONNECT_STR="127.0.0.1:2181";staticStringNODE_PATH="/service";staticStringSERIVCE_NAME="/myService";privateZkClientzkClient;publicServiceProvider(){zkClient=newZkClient(newZkConnection(ZK_CONNECT_STR));System.out.println("sucessconnectedtozookeeperserver!");//不存在就创建NODE_PATH节点if(!zkClient.exists(NODE_PATH)){zkClient.create(NODE_PATH,"thisismailbox",CreateMode.PERSISTENT);}}publicvoidregistryService(StringlocalIp,Objectobj){if(!zkClient.exists(NODE_PATH+SERIVCE_NAME)){zkClient.create(NODE_PATH+SERIVCE_NAME,"providerserviceslist",CreateMode.PERSISTENT);}//对自己的服务进行注册zkClient.createEphemeral(NODE_PATH+SERIVCE_NAME+"/"+localIp,obj);System.out.println("注册成功!["+localIp+"]");}}/***消费者,通过某种均衡负载算法选择某一个提供者**@authorAdministrator**/publicclassServiceConsumer{//静态常量staticStringZK_CONNECT_STR="127.0.0.1:2181";staticStringNODE_PATH="/service";staticStringSERIVCE_NAME="/myService";privateList<String>serviceList=newArrayList<String>();privateZkClientzkClient;publicServiceConsumer(){zkClient=newZkClient(newZkConnection(ZK_CONNECT_STR));System.out.println("sucessconnectedtozookeeperserver!");//不存在就创建NODE_PATH节点if(!zkClient.exists(NODE_PATH)){zkClient.create(NODE_PATH,"thisismailbox",CreateMode.PERSISTENT);}}/***订阅服务*/publicvoidsubscribeSerivce(){serviceList=zkClient.getChildren(NODE_PATH+SERIVCE_NAME);zkClient.subscribeChildChanges(NODE_PATH+SERIVCE_NAME,newIZkChildListener(){@OverridepublicvoidhandleChildChange(StringparentPath,List<String>currentChilds)throwsException{serviceList=currentChilds;}});}/***模拟调用服务*/publicvoidconsume(){//负载均衡算法获取某台机器调用服务intindex=newRandom().nextInt(serviceList.size());System.out.println("调用["+NODE_PATH+SERIVCE_NAME+"]服务:"+serviceList.get(index));}}
3.总结
Zookeeper是一个功能非常强大的应用,除了上面几种应用外,还有命名服务、分布式协调通知等也是常用的场景。
原文链接:https://blog.csdn.net/u013468915/article/details/80955110