[源码分析] 消息队列 Kombu 之 Consumer
目录
- [源码分析] 消息队列 Kombu 之 Consumer
- 0x00 摘要
- 0x01 综述功能
- 0x02 示例代码
- 0x03 定义
- 3.1 定义
- 3.2 Queue
- 0x04 Init
- 4.1 处理调用
- 4.1.1 queues
- 4.1.2 channel
- 4.1.3 on_message
- 4.2 建立联系
- 4.2.1 channel与queue
- 4.2.2 channel与exchange
- 4.2.3 Exchange & Binding
- 4.2.3.1 Channel binding
- 4.2.3.2 使用
- 4.1 处理调用
- 0x05 完善联系
- 5.1 遍历Queue
- 5.2 consume in Queue
- 5.3 consume in Channel
- 0x06 消费消息
- 6.1 drain_events in Connection
- 6.2 drain_events in Transport
- 6.3 get in MultiChannelPoller
- 6.3.1 _register_BRPOP in MultiChannelPoller
- 6.3.2 register in _poll
- 6.3.3 poll(timeout) in MultiChannelPoller
- 6.3.4 注册到redis驱动,负载均衡
- 6.3.4 handle_event in MultiChannelPoller
- 6.3.5 on_readable in MultiChannelPoller
- 6.3.6 _brpop_read in Channel
- 6.3.7 从redis读取
- 6.3.8 回到_brpop_read
- 6.3.9 _deliver in Transport
- 6.3.10 basic_consume in Channel
- 6.3.11 _receive_callback in Consumer
- 0xFF 参考
0x00 摘要
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Consumer 概念。
0x01 综述功能
Consumer 的作用主要如下:
- Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
- Queue:对应的队列抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
- Consumers : 是接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。就是说,从用户角度,知道了一个 exchange,就可以从中读取消息,具体这个消息就是从 queue 中读取的。
在具体的实现中,Consumer 把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问redis。Queue 也有 Exchange,知道访问具体 redis 哪个key(就是queue对应的那个key)。即 Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
所以服务端的逻辑大致为:
- 建立连接;
- 创建Exchange ;
- 创建Queue,并将Exchange与Queue绑定,Queue的名称为routing_key ;
- 创建Consumer对Queue监听;
0x02 示例代码
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
前文已经完成了构建部分,下面来到了Consumer部分,即如下代码:
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(
3, p_message
)
hub.run_forever()
0x03 定义
3.1 定义
Consumer主要成员变量如下:
- channel:存在 (kombu.Connection, Channel) 这两种可能,一个 Connection 就对应一个 MQ 的连接,Channel可以理解成共享一个Connection的多个轻量化连接。
- queues:(Sequence[kombu.Queue])类型。对应 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息
- on_message:消息响应方法;
这也是调用时传入的变量。
class Consumer:
"""Message consumer.
Arguments:
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
auto_declare (bool): see :attr:`auto_declare`
callbacks (Sequence[Callable]): see :attr:`callbacks`.
on_message (Callable): See :attr:`on_message`
on_decode_error (Callable): see :attr:`on_decode_error`.
prefetch_count (int): see :attr:`prefetch_count`.
"""
#: The connection/channel to use for this consumer.
channel = None
#: A single :class:`~kombu.Queue`, or a list of queues to
#: consume from.
queues = None
#: Flag for automatic message acknowledgment.
no_ack = None
#: List of callbacks called in order when a message is received.
callbacks = None
#: Optional function called whenever a message is received.
on_message = None
#: List of accepted content-types.
accept = None
#: Initial prefetch count
prefetch_count = None
#: Mapping of queues we consume from.
_queues = None
_tags = count(1) # global
3.2 Queue
我们也给出 Queue 的定义,其中主要成员变量如下:
- exchange (Exchange): 就是 queue 绑定的 Exchange;
- routing_key (str): 就是 queue 对应的 key;
- channel :queue 绑定的 信道;
具体定义如下:
class Queue(MaybeChannelBound):
"""A Queue declaration.
channel (ChannelT): The channel the Queue is bound to (if bound).
"""
ContentDisallowed = ContentDisallowed
name = ''
exchange = Exchange('')
routing_key = ''
durable = True
exclusive = False
auto_delete = False
no_ack = False
attrs = (
('name', None),
('exchange', None),
('routing_key', None),
('queue_arguments', None),
('binding_arguments', None),
('consumer_arguments', None),
('durable', bool),
('exclusive', bool),
('auto_delete', bool),
('no_ack', None),
('alias', None),
('bindings', list),
('no_declare', bool),
('expires', float),
('message_ttl', float),
('max_length', int),
('max_length_bytes', int),
('max_priority', int)
)
0x04 Init
在此方法中,先处理调用,随之建立联系。
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None, on_message=None,
accept=None, prefetch_count=None, tag_prefix=None):
self.channel = channel
self.queues = maybe_list(queues or [])
self.no_ack = self.no_ack if no_ack is None else no_ack
self.callbacks = (self.callbacks or [] if callbacks is None
else callbacks)
self.on_message = on_message
self.tag_prefix = tag_prefix
self._active_tags = {}
self.accept = prepare_accept_content(accept)
self.prefetch_count = prefetch_count
if self.channel:
self.revive(self.channel)
4.1 处理调用
4.1.1 queues
传入的参数queues被作为成员变量保存起来。
self.queues = maybe_list(queues or [])
4.1.2 channel
传入的参数Connection被作为成员变量保存起来。
self.channel = channel
4.1.3 on_message
传入的参数on_message 作为消息响应方法保存起来。
self.on_message = on_message
4.2 建立联系
用如下方法把 Exchange,Queue 与 Connection 联系起来。
def revive(self, channel):
"""Revive consumer after connection loss."""
self._active_tags.clear()
channel = self.channel = maybe_channel(channel)
# modify dict size while iterating over it is not allowed
for qname, queue in list(self._queues.items()):
# name may have changed after declare
self._queues.pop(qname, None)
queue = self._queues[queue.name] = queue(self.channel)
queue.revive(channel)
if self.auto_declare:
self.declare()
if self.prefetch_count is not None:
self.qos(prefetch_count=self.prefetch_count)
进一步调用:
when_bound, entity.py:598
maybe_bind, abstract.py:76
bind, abstract.py:70
bind, entity.py:590
__call__, abstract.py:66
revive, messaging.py:400
__init__, messaging.py:382
main, testUb.py:46
<module>, testUb.py:55
由此进入到了Queue类。
4.2.1 channel与queue
这里用如下方法把queue与channel联系起来。queue 里面有一个 channel,用来访问redis,Queue 也有 Exchange,知道访问具体 redis 哪里。
每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!
Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
self = {Queue} <Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>
这样,conneciton就是queue的成员变量。
def revive(self, channel):
"""Revive channel after the connection has been re-established.
"""
if self.is_bound:
self._channel = channel
self.when_bound()
4.2.2 channel与exchange
之前我们知道,Queue是包括了exchange成员变量,目前channel也是exchange的成员变量。
Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
于是经由如下方法,准备把channel与exchange联系起来。
def when_bound(self):
if self.exchange:
self.exchange = self.exchange(self.channel)
此时变量如下:
channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
self = {Exchange} Exchange asynt(direct)
进而直接在Exchange基类,使用方法maybe_bind把channel与exchange联系起来。
class MaybeChannelBound(Object):
"""Mixin for classes that can be bound to an AMQP channel."""
_channel = None
def __call__(self, channel):
"""`self(channel) -> self.bind(channel)`."""
return self.bind(channel)
def bind(self, channel):
"""Create copy of the instance that is bound to a channel."""
return copy(self).maybe_bind(channel)
def maybe_bind(self, channel):
"""Bind instance to channel if not already bound."""
if not self.is_bound and channel:
self._channel = maybe_channel(channel)
self.when_bound()
self._is_bound = True
return self
4.2.3 Exchange & Binding
这里会把 Exchange 和 queue 联系。就是把 Exchange 和 routing_key 联系起来,然后把这些联系规则放到redis 之中。
堆栈如下:
_queue_bind, redis.py:814
queue_bind, base.py:568
bind_to, entity.py:674
queue_bind, entity.py:662
_create_queue, entity.py:617
declare, entity.py:606
declare, messaging.py:417
revive, messaging.py:404
__init__, messaging.py:382
具体为
class Queue(MaybeChannelBound):
def __init__(self, name='', exchange=None, routing_key='',
channel=None, bindings=None, on_declared=None,
**kwargs):
super().__init__(**kwargs)
self.name = name or self.name
if isinstance(exchange, str):
self.exchange = Exchange(exchange)
elif isinstance(exchange, Exchange):
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.bindings = set(bindings or [])
self.on_declared = on_declared
# allows Queue('name', [binding(...), binding(...), ...])
if isinstance(exchange, (list, tuple, set)):
self.bindings |= set(exchange)
if self.bindings:
self.exchange = None
# exclusive implies auto-delete.
if self.exclusive:
self.auto_delete = True
self.maybe_bind(channel)
def queue_bind(self, nowait=False, channel=None):
"""Create the queue binding on the server."""
return self.bind_to(self.exchange, self.routing_key,
self.binding_arguments,
channel=channel, nowait=nowait)
def bind_to(self, exchange='', routing_key='',
arguments=None, nowait=False, channel=None):
if isinstance(exchange, Exchange):
exchange = exchange.name
return (channel or self.channel).queue_bind(
queue=self.name,
exchange=exchange,
routing_key=routing_key,
arguments=arguments,
nowait=nowait,
)
4.2.3.1 Channel binding
具体调用到Channel,代码位于 kombu/transport/redis.py。
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == 'fanout':
# Mark exchange as fanout.
self._fanout_queues[queue] = (
exchange, routing_key.replace('#', '*'),
)
with self.conn_or_acquire() as client:
client.sadd(self.keyprefix_queue % (exchange,),
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
代码然后调用到redis client。
# SET COMMANDS
def sadd(self, name, *values):
"Add ``value(s)`` to set ``name``"
return self.execute_command('SADD', name, *values)
具体变量如下,我们代码中,exchange内容为_kombu.binding.asynt_exchange。routing_key的是asynt_routing_key。
name = {str} '_kombu.binding.asynt_exchange'
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
values = {tuple: 1} asynt_routing_keysynt_queue
我们看看Redis内容,发现新建内容如下:
127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
集合名字为:self.keyprefix_queue % (exchange,), 对于我们就为:_kombu.binding.asynt_exchange
。
集合每个item为:routing_key + sep + pattern + sep + queue
。我们这里sep = ‘\x06\x16’。
4.2.3.2 使用
当发消息时候,Exchange的作用是将发送的 routing_key
转化为 queue
的名字。这样发送就知道发到哪个 queue
。这里的 exchange 内容为 _kombu.binding.asynt_exchange。
def get_table(self, exchange):
key = self.keyprefix_queue % exchange
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
得到的集合内容为:
{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}
即从 exchange 得到 routing_key —> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。
逻辑如下:
+---------------------------------+
| exchange |
| |
1 routing_key x | |
+----------+ | | +------------+
| Producer | +-----------------> | routing_key x ---> queue x | | Consumer |
+--------+-+ | | +------------+
| | routing_key y ---> queue y |
| | | ^
| | routing_key z ---> queue z | |
| | | |
| +---------------------------------+ |
| |
| |
| |
| |
| |
| |
| |
| |
| +-----------+ |
| 2 message | | 3 message |
+-------------------------------> | queue X | +--------------------+
| |
+-----------+
因此,此时总体逻辑如下图:
+----------------------+ +-------------------+
| Consumer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +--------------------> | | +-----------------------------------------------------------+
| | | pool |
| | +---------> | | <------------------------------------------------------------+
| queues | | | | |
| | | +----> | connection +---------------+ |
| | | | | | | | |
+----------------------+ | | +-------------------+ | |
| | | v |
| | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| | | | | | | +-------+
| | | | | | _poller+---------> | poll |
v | | +------------------+ | | +-------+
| | +-------------+
+-------------------+ | +----------------+
| Queue | | | | Exchange |
| _chann+l | +----+ | |
| | | |
| exchange +----------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手机如下:
现在我们知道:
- Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。
- Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
- Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
- Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连;
于是逻辑链已经形成,大约是这样的,后文完善:
- Producer发送消息到Exchange;
- Exchange中有成员变量Channel,也有成员变量Queues。
- 于是Exchange负责通过Channel将消息分发至Queue,Exchange的作用只是将发送的
routing_key
转化为queue
的名字。 - Consumer去Queue取消息;
逻辑大致通了,但是缺少动态操作完成此逻辑,我们将在后续完善动态逻辑。
0x05 完善联系
在init之后,第二步会完善联系。
python的上下文管理。在python中实现了__enter__和__exit__方法,即支持上下文管理器协议。上下文管理器就是支持上下文管理器协议的对象,它是为了with而生。当with语句在开始运行时,会在上下文管理器对象上调用 enter 方法。with语句运行结束后,会在上下文管理器对象上调用 exit 方法。
所以这里是调用__enter__
,即 consumer 函数,其目的如下:
- 调用Channel继续处理,
Channel
将Consumer
标签,Consumer
要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。 - 另外,还通过
Transport
将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。
class Consumer:
"""Message consumer.
Arguments:
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
auto_declare (bool): see :attr:`auto_declare`
callbacks (Sequence[Callable]): see :attr:`callbacks`.
on_message (Callable): See :attr:`on_message`
on_decode_error (Callable): see :attr:`on_decode_error`.
prefetch_count (int): see :attr:`prefetch_count`.
"""
def __enter__(self):
self.consume()
return self
5.1 遍历Queue
使用_basic_consume
方法处理Consumer相关的队列列表中的每一项,其中处理最后一个Queue时设置标志nowait=False
。
def consume(self, no_ack=None):
"""Start consuming messages.
Can be called multiple times, but note that while it
will consume from new queues added since the last call,
it will not cancel consuming from removed queues (
use :meth:`cancel_by_queue`).
Arguments:
no_ack (bool): See :attr:`no_ack`.
"""
queues = list(self._queues.values())
if queues:
no_ack = self.no_ack if no_ack is None else no_ack
H, T = queues[:-1], queues[-1]
for queue in H:
self._basic_consume(queue, no_ack=no_ack, nowait=True)
self._basic_consume(T, no_ack=no_ack, nowait=False)
_basic_consume
方法代码如下:
是将消费者标签以及回调函数传给Queue
的consume
方法。
def _basic_consume(self, queue, consumer_tag=None,
no_ack=no_ack, nowait=True):
tag = self._active_tags.get(queue.name)
if tag is None:
tag = self._add_tag(queue, consumer_tag)
queue.consume(tag, self._receive_callback,
no_ack=no_ack, nowait=nowait)
return tag
5.2 consume in Queue
对于每一个 queue,都会调用其 consume 函数。
Queue
的consume
方法代码:
class Queue(MaybeChannelBound):
def consume(self, consumer_tag='', callback=None,
no_ack=None, nowait=False):
"""Start a queue consumer.
Consumers last as long as the channel they were created on, or
until the client cancels them.
Arguments:
consumer_tag (str): Unique identifier for the consumer.
The consumer tag is local to a connection, so two clients
can use the same consumer tags. If this field is empty
the server will generate a unique tag.
no_ack (bool): If enabled the broker will automatically
ack messages.
nowait (bool): Do not wait for a reply.
callback (Callable): callback called for each delivered message.
"""
if no_ack is None:
no_ack = self.no_ack
return self.channel.basic_consume(
queue=self.name,
no_ack=no_ack,
consumer_tag=consumer_tag or '',
callback=callback,
nowait=nowait,
arguments=self.consumer_arguments)
前面提到,queue与channel已经联系了起来。
每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!
Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
5.3 consume in Channel
因此又回到了Channel
,就是Channel
的basic_consume
代码:
调用到基类basic_consume方法。
class Channel(virtual.Channel):
def basic_consume(self, queue, *args, **kwargs):
if queue in self._fanout_queues:
exchange, _ = self._fanout_queues[queue]
self.active_fanout_queues.add(queue)
self._fanout_to_queue[exchange] = queue
ret = super().basic_consume(queue, *args, **kwargs)
# Update fair cycle between queues.
#
# We cycle between queues fairly to make sure that
# each queue is equally likely to be consumed from,
# so that a very busy queue will not block others.
#
# This works by using Redis's `BRPOP` command and
# by rotating the most recently used queue to the
# and of the list. See Kombu github issue #166 for
# more discussion of this method.
self._update_queue_cycle()
return ret
基类是 virtual.Channel,其作用是:
Channel
将Consumer
标签,Consumer
要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。另外,还通过Transport
将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。
变量是:
- _tag_to_queue:标签与队列的映射关系;
- _active_queues:
Consumer
要消费的队列; - _consumers:
Consumer
标签; - connection:
Transport
; - connection._callbacks:队列与回调函数列表的映射关系;
数值如下:
self._tag_to_queue = {dict: 1} {'None1': 'asynt'}
self._active_queues = {list: 1} ['asynt']
self._consumers = {set: 1} {'None1'}
self.connection = {Transport} <kombu.transport.redis.Transport object at 0x7fb3ee0155f8>
self.connection._callbacks = {dict: 1} {'asynt': <function Channel.basic_consume.<locals>._callback at 0x7fb3ecd4a2f0>}
代码如下:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
_reset_cycle 代码如下,看起来就是调用了 FairCycle,实际上没有用到,因为cycle已经有预设。cycle
是一个MultiChannelPoller
实例。
def _reset_cycle(self):
self._cycle = FairCycle(
self._get_and_deliver, self._active_queues, Empty)
具体如下图:
+----------+ +-------+ +---------+
| Consumer | | Queue | | Channel |
+----+-----+ +---+---+ +-----+---+
| | |
| | |
__enter__ | |
| | |
| | |
consume | |
| | |
| | |
_basic_consume | |
| | |
| | |
| consume | |
+------------> | |
| | basic_consume |
| | |
| | +-----------> |
| | |
| | |
| | _reset_cycle
| | |
| | |
| | |
| | |
| | |
v v v
0x06 消费消息
为了更好的分析,我们暂时注销hub,使用drain_events消费消息,这样更直观。
就是说,Consumer 已经和 Channel 联系起来,知道读取redis 中的哪个key。但是现在缺少一个读取消息的引擎。这个引擎可以驱动消息读取,每次有消息,就调用 consumer 中的回调函数来处理消息。
在没有引擎的情况下,drain_events 就可以起到引擎的作用。
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
# hub.timer.call_repeatedly(3, p_message)
# hub.run_forever()
conn.drain_events(timeout=1)
6.1 drain_events in Connection
drain_events 调用 Connection 的方法来进行消费。
def drain_events(self, **kwargs):
"""Wait for a single event from the server.
Arguments:
timeout (float): Timeout in seconds before we give up.
"""
return self.transport.drain_events(self.connection, **kwargs)
6.2 drain_events in Transport
在 Transport中的drain_events ,是在无限执行get(self._deliver, timeout=timeout)
get
是self.cycle
的一个方法,cycle
是一个MultiChannelPoller
实例:
所以get
是<bound method MultiChannelPoller.get of <kombu.transport.redis.MultiChannelPoller object at 0x7feab312b358>>
def drain_events(self, connection, timeout=None):
time_start = monotonic()
get = self.cycle.get
polling_interval = self.polling_interval
if timeout and polling_interval and polling_interval > timeout:
polling_interval = timeout
while 1:
try:
get(self._deliver, timeout=timeout)
except Empty:
if timeout is not None and monotonic() - time_start >= timeout:
raise socket.timeout()
if polling_interval is not None:
sleep(polling_interval)
else:
break
6.3 get in MultiChannelPoller
Transport
相关联的每一个channel都要执行drain_events
。具体分两步:
-
对于每一个channel都注册;
-
进行poll;
代码如下:
def get(self, callback, timeout=None):
self._in_protected_read = True
try:
for channel in self._channels:
if channel.active_queues: # BRPOP mode?
if channel.qos.can_consume():
self._register_BRPOP(channel)
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
events = self.poller.poll(timeout)
if events:
for fileno, event in events:
ret = self.handle_event(fileno, event)
if ret:
return
# - no new data, so try to restore messages.
# - reset active redis commands.
self.maybe_restore_messages()
raise Empty()
finally:
self._in_protected_read = False
while self.after_read:
try:
fun = self.after_read.pop()
except KeyError:
break
else:
fun()
6.3.1 _register_BRPOP in MultiChannelPoller
具体注册如下,我们先来看看 _register_BRPOP
,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了。
def _register_BRPOP(self, channel):
"""Enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
self._register(*ident)
if not channel._in_poll: # send BRPOP
channel._brpop_start()
6.3.2 register in _poll
最终进行Poll注册,这样当redis的socket对应的fd有消息,就会进行处理。
变量如下:<kombu.utils.eventio._poll object at 0x7feab2d7d780>
def register(self, fd, events):
fd = fileno(fd)
poll_flags = 0
if events & ERR:
poll_flags |= POLLERR
if events & WRITE:
poll_flags |= POLLOUT
if events & READ:
poll_flags |= POLLIN
self._quick_register(fd, poll_flags)
return fd
6.3.3 poll(timeout) in MultiChannelPoller
当poll有消息,则相应处理。
events = self.poller.poll(timeout)
if events:
for fileno, event in events:
ret = self.handle_event(fileno, event)
if ret:
return
6.3.4 注册到redis驱动,负载均衡
但是,这个 connection 还没有对 epoll 起效果,所以发送一个 _brpop_start
。
这里可以看到,是对 asynt_queue 发起了监听请求,也就是说队列有消息过来,会被响应到。
变量如下:
keys = {list: 5} ['asynt_queue', 'asynt_queue\x06\x163', 'asynt_queue\x06\x166', 'asynt_queue\x06\x169', 1]
queues = {list: 1} ['asynt_queue']
代码如下:
def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
此处有一个负载均衡需要说明:
_queue_cycle属于均衡策略,就是选择下一次哪个queue的策略,items就是具体queue列表。比如:
class round_robin_cycle:
"""Iterator that cycles between items in round-robin."""
def __init__(self, it=None):
self.items = it if it is not None else []
def update(self, it):
"""Update items from iterable."""
self.items[:] = it
def consume(self, n):
"""Consume n items."""
return self.items[:n]
_brpop_start就是启动了下一次读取,选择哪一个queue。
consume, scheduling.py:79
_brpop_start, redis.py:725
_register_BRPOP, redis.py:314
on_poll_start, redis.py:328
on_poll_start, redis.py:1072
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:49
<module>, testUb.py:53
6.3.4 handle_event in MultiChannelPoller
因为已经把 file 和 poll 联系起来,所以对调用对应的响应方法,而响应方法会进行read消息。
def handle_event(self, fileno, event):
if event & READ:
return self.on_readable(fileno), self
elif event & ERR:
chan, type = self._fd_to_chan[fileno]
chan._poll_error(type)
6.3.5 on_readable in MultiChannelPoller
这里听说 Redis 已经准备好了,所以就来获取拿到的结果,然后就解析起来了,解析成功之后,自然要处理这个消息呀,于是乎又回到了这里 redis.py
:
提取fd对应的channel的响应方法如下:
def on_readable(self, fileno):
chan, type = self._fd_to_chan[fileno]
if chan.qos.can_consume():
chan.handlers[type]()
6.3.6 _brpop_read in Channel
前面对chan.handlers已经进行了注册。
handlers = {dict: 2}
'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
因此调用_brpop_read。
def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
'BRPOP',
**options)
except self.connection_errors:
# if there's a ConnectionError, disconnect so the next
# iteration will reconnect automatically.
self.client.connection.disconnect()
raise
if dest__item:
dest, item = dest__item
dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
self._queue_cycle.rotate(dest)
self.connection._deliver(loads(bytes_to_str(item)), dest)
return True
else:
raise Empty()
finally:
self._in_poll = None
6.3.7 从redis读取
这里会从redis驱动读取,文件/redis/connection.py,从SocketBuffer读取。
代码为:
def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
while not data.endswith(SYM_CRLF):
# there's more data in the socket that we need
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline()
self.bytes_read += len(data)
# purge the buffer when we've consumed it all so it doesn't
# grow forever
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
当读到 response 之后,调用 Redis驱动中对应命令的 回调方法来处理。此处命令为BRPOP。回调方法为:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)
。
代码为:
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
raise
if command_name in self.response_callbacks:
return self.response_callbacks[command_name](response, **options)
return response
变量为:
command_name = {str} 'BRPOP'
connection = {Connection} Connection<host=localhost,port=6379,db=0>
options = {dict: 0} {}
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
connection = {Connection} Connection<host=localhost,port=6379,db=0>
connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
response_callbacks = {CaseInsensitiveDict: 179} {.
'LPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
'RPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
'SORT' = {function} <function sort_return_tuples at 0x7fbad4275f28>
'ZSCORE' = {function} <function float_or_none at 0x7fbad4276598>
'ZINCRBY' = {function} <function float_or_none at 0x7fbad4276598>
'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
....
这些代码堆栈如下:
readline, connection.py:251
read_response, connection.py:324
read_response, connection.py:739
parse_response, client.py:915
_brpop_read, redis.py:738
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, testUb.py:50
<module>, testUb.py:53
6.3.8 回到_brpop_read
从Redis驱动获得消息后,回到了 _brpop_read,信息如下:
dest__item = {tuple: 2}
0 = {bytes: 11} b'asynt_queue'
1 = {bytes: 321} b'{"body": "aGVsbG8gd29ybGQ=", "content-encoding": "utf-8", "content-type": "text/plain", "headers": {}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "asynt_exchange", "routing_key": "asynt_routing_key"}, "priority": 0, "body_encoding":
6.3.9 _deliver in Transport
当获得消息之后,会取出对应queue的callback,进行调用。
变量如下:<kombu.transport.redis.Transport object at 0x7feab30f25c0>
def _deliver(self, message, queue):
try:
callback = self._callbacks[queue]
except KeyError:
logger.warning(W_NO_CONSUMERS, queue)
self._reject_inbound_message(message)
else:
callback(message)
6.3.10 basic_consume in Channel
代码继续走到 basic_consume
<kombu.transport.redis.Channel object at 0x7feab3235f28>
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
6.3.11 _receive_callback in Consumer
上文的 _callback 就是 _receive_callback in Consumer,所以这时候就调用过去。
<Consumer: [<Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>]>
def _receive_callback(self, message):
accept = self.accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
m2p = getattr(channel, 'message_to_python', None)
if m2p:
message = m2p(message)
if accept is not None:
message.accept = accept
if message.errors:
return message._reraise_error(self.on_decode_error)
decoded = None if on_m else message.decode()
except Exception as exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
else:
return on_m(message) if on_m else self.receive(decoded, message)
最终调用用户方法。
on_message, testUb.py:36
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_brpop_read, redis.py:748
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, testUb.py:50
<module>, testUb.py:53
具体如下:
+----------+ +---------+ +------------------+ +------+ +---------+ +-----+ +---------+
|Connection| |Transport| |MultiChannelPoller| |_poll | | Channel | |redis| |Consumer |
+----+-----+ +------+--+ +------------+-----+ +----+-+ +-----+---+ +--+--+ +---+-----+
| | | | | | |
+ | | | | | |
drain_events | | | | | |
+ + | | | | |
+-------> drain_events | | | | |
| + + | | | |
| | +------------> get | | | |
| | + | | | |
| | + | | | |
| | _register_BRPOP | | | |
| | + + | | |
| | | +-----------> register | | |
| | | + | | |
| | + | | | |
| | poll | | | |
| | + | | | |
| | + | | | |
| | handle_event | | | |
| | + | | | |
| | + | | | |
| | on_readable | | | |
| | + | + | |
| | | +----------------->_brpop_read | |
| | | | + | |
| + | | +---------> | |
| _deliver <-------------------------------------+ | |
| + | | | | |
| | | | | | |
| | | | | | |
| | +----------------------------------> basic|consume | |
| | | | | | |
| | | | +---------> | |
| | | | | | |
| | | | | | |
| | | | | v |
| | | | | |
| | | | | _receive_ca|lback
| | | | | |
v v v v v |
v
从上图可以看出模块的用途。
手机上如图
至此,我们已经完成了 Consumer 的分析,下文我们进行 Producer 的分析。
0xFF 参考
celery 7 优秀开源项目kombu源码分析之registry和entrypoint
(二)放弃pika,选择kombu
kombu消息框架<二>
AMQP中的概念
AMQP的基本概念
深入理解AMQP协议
kombu和消息队列总结
关于epoll版服务器的理解(Python实现)
celery源码解读
Kombu源码分析(一)概述