导读:本篇文章首席CTO笔记来给大家介绍有关django中怎么用kafka的相关内容,希望对大家有所帮助,一起来看看吧。
本文目录一览:
1、[Django] celery的替代品 funboost2、Kafka在大数据环境中如何应用呢?3、如何使用python 连接kafka 并获取数据[Django] celery的替代品 funboost
Django开发web应用的过程中,一个老大难问题是异步调度问题。例如用户传来一个非常耗时的请求,这时候最好的处理方式是先把这个操作请求记录下来,先响应请求,等后面有空的时候再去计算,而不是让用户干等着着急。
这种优化方式就是典型的生产者+消息队列+消费者设计模式,而Django框架本身并没有直接提供该设计模式的实现,大多教程都是利用第三方组件celery+redis来实现这个调度。
遗憾的是celery和redis官方都不支持windows,而我习惯的开发环境还是win10,所以需要找一个替代品。经过调研,发现了一个很好的【python分布式函数调度框架——funboost】. 它的优点很多,对Django开发来说,最大的亮点是完全无需启动第三方服务,即可实现生产消费设计模式。一个 pip install funboost 即可干活,开箱即用。它可以使用SQLite文件来做消息队列,足以应对小型应用开发。当然也可以使用Kafka这种高级的消息中间件,实现高可用。
要说缺点吧,这个组件的日志打印太啰嗦,而且没有提供关闭选项,控制台已被它刷屏。
Kafka在大数据环境中如何应用呢?
我们生活在一个数据爆炸的时代,数据的巨量增长给我们的业务处理带来了压力,同时巨量的数据也给我们带来了十分可观的财富。随着大数据将各个行业用户、运营商、服务商的数据整合进大数据环境,或用户取用大数据环境中海量的数据,业务平台间的消息处理将变得尤为复杂。如何高效地采集、使用数据,如何减轻各业务系统的压力,也变得越来越突出。在早期的系统实现时,业务比较简单。即便是数据量、业务量比较大,大数据环境也能做出处理。但是随着接入的系统增多,数据量、业务量增大,大数据环境、业务系统都可出现一定的瓶颈。下面我们看几个场景。
场景一:我们开发过一个设备信息挖掘平台。这个平台需要实时将采集互联网关采集到的路由节点的状态信息存入数据中心。通常一个网关一次需要上报几十甚至几百个变化的路由信息。全区有几万个这种互联网关。当信息采集平台将这些变化的数据信息写入或更新到数据库时候,会给数据库代理非常大的压力,甚至可以直接将数据库搞挂掉。这就对我们的数据采集系统提出了很高的要求。如何稳定高效地把消息更新到数据库这一要求摆了出来。
场景二:数据中心处理过的数据需要实时共享给几个不同的机构。我们常采用的方法是将数据批量存放在数据采集机,分支机构定时来采集;或是分支机构通过JDBC、RPC、http或其他机制实时从数据中心获取数据。这两种方式都存在一定的问题,前者在于实时性不足,还牵涉到数据完整性问题;后者在于,当数据量很大的时候,多个分支机构同时读取数据,会对数据中心的造成很大的压力,也造成很大的资源浪费。
为了解决以上场景提出的问题,我们需要这样一个消息系统:
缓冲能力,系统可以提供一个缓冲区,当有大量数据来临时,系统可以将数据可靠的缓冲起来,供后续模块处理;
订阅、分发能力,系统可以接收消息可靠的缓存下来,也可以将可靠缓存的数据发布给使用者。
这就要我们找一个高吞吐的、能满足订阅发布需求的系统。
Kafka是一个分布式的、高吞吐的、基于发布/订阅的消息系统。利用kafka技术可以在廉价PC Server上搭建起大规模的消息系统。Kafka具有消息持久化、高吞吐、分布式、实时、低耦合、多客户端支持、数据可靠等诸多特点,适合在线和离线的消息处理。
互联网关采集到变化的路由信息,通过kafka的producer将归集后的信息批量传入kafka。Kafka按照接收顺序对归集的信息进行缓存,并加入待消费队列。Kafka的consumer读取队列信息,并一定的处理策略,将获取的信息更新到数据库。完成数据到数据中心的存储。
数据中心的数据需要共享时,kafka的producer先从数据中心读取数据,然后传入kafka缓存并加入待消费队列。各分支结构作为数据消费者,启动消费动作,从kafka队列读取数据,并对获取的数据进行处理。
消息生产者根据需求,灵活定义produceInfoProcess()方法,对相关数据进行处理。并依据数据发布到kafka的情况,处理回调机制。在数据发送失败时,定义failedSend()方法;当数据发送成功时,定义successedSend()方法。
如何使用python 连接kafka 并获取数据
连接
kafka
的库有两种类型,一种是直接连接
kafka
的,存储
offset
的事情要自己在客户端完成。还有一种是先连接
zookeeper
然后再通过
zookeeper
获取
kafka
的
brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
来协调。
我现在使用
samsa
这个
highlevel
库
Producer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
msg
Tip
consumer
必需在
producer
向
kafka
的
topic
里面提交数据后才能连接,否则会出错。
在
Kafka
中一个
consumer
需要指定
groupname
,
groue
中保存着
offset
等信息,新开启一个
group
会从
offset
的位置重新开始获取日志。
kafka
的配置参数中有个
partition
,默认是
1
,这个会对数据进行分区,如果多个
consumer
想连接同个
group
就必需要增加
partition
,
partition
只能大于
consumer
的数量,否则多出来的
consumer
将无法获取到数据。
结语:以上就是首席CTO笔记为大家整理的关于django中怎么用kafka的相关内容解答汇总了,希望对您有所帮助!如果解决了您的问题欢迎分享给更多关注此问题的朋友喔~