消息队列Kombu之Producer源码分析

producer">目录

  • [源码分析] 消息队列 Kombu 之 Producer

    • 0x00 摘要
    • 0x01 示例代码
    • 0x02 来由
    • 0x03 建立

      • 3.1 定义
      • 3.2 init

        • 3.2.1 转换channel

    • 0x04 发送

      • 4.1 组装消息 in channel
      • 4.2 发送消息 in channel
      • 4.3 deliver in exchange
      • 4.4 binding 转换
      • 4.5 _put in channel

    • 0x05 总结
    • 0xFF 参考

0x00 摘要

本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Producer 概念。

0x01 示例代码

下面使用如下代码来进行说明。

本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

def main(arguments):

hub = Hub()

exchange = Exchange('asynt_exchange')

queue = Queue('asynt_queue', exchange, 'asynt_routing_key')

def send_message(conn):

producer = Producer(conn)

producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')

print('message sent')

def on_message(message):

print('received: {0!r}'.format(message.body))

message.ack()

# hub.stop() # <-- exit after one message

conn = Connection('redis://localhost:6379')

conn.register_with_event_loop(hub)

def p_message():

print(' kombu ')

with Consumer(conn, [queue], on_message=on_message):

send_message(conn)

hub.timer.call_repeatedly(3, p_message)

hub.run_forever()

if __name__ == '__main__':

sys.exit(main(sys.argv[1:]))

0x02 来由

前文已经完成了构建部分,Consumer部分,下面来到了Producer部分,即如下代码:

def send_message(conn):

producer = Producer(conn)

producer.publish('hello world', exchange=exchange, routing_key='asynt')

print('message sent')

我们知道,Transport需要把Channel与文件信息联系起来,但是此时Transport信息如下,文件信息依然没有,这是我们以后需要留意的

transport = {Transport} <kombu.transport.redis.Transport object at 0x7f9056a26f98>

Channel = {type} <class 'kombu.transport.redis.Channel'>

Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>

Management = {type} <class 'kombu.transport.virtual.base.Management'>

channel_max = {int} 65535

channels = {list: 2} [<kombu.transport.redis.Channel object at 0x7f9056a57278>, <kombu.transport.redis.Channel object at 0x7f9056b79cc0>]

client = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>

cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>

after_read = {set: 0} set()

eventflags = {int} 25

fds = {dict: 0} {}

poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>

default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'}

default_port = {int} 6379

driver_name = {str} 'redis'

driver_type = {str} 'redis'

implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False}

manager = {Management} <kombu.transport.virtual.base.Management object at 0x7f9056b79be0>

polling_interval = {NoneType} None

state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7f9056a9ec50>

0x03 建立

3.1 定义

Producer中,主要变量是:

  • _channel :就是channel;
  • exchange :exchange;

但是本文示例没有传入exchange,这就有些奇怪,我们需要继续看看

class Producer:

"""Message Producer.

Arguments:

channel (kombu.Connection, ChannelT): Connection or channel.

exchange (kombu.entity.Exchange, str): Optional default exchange.

routing_key (str): Optional default routing key.

"""

#: Default exchange

exchange = None

#: Default routing key.

routing_key = ''

#: Default serializer to use. Default is JSON.

serializer = None

#: Default compression method. Disabled by default.

compression = None

#: By default, if a defualt exchange is set,

#: that exchange will be declare when publishing a message.

auto_declare = True

#: Basic return callback.

on_return = None

#: Set if channel argument was a Connection instance (using

#: default_channel).

__connection__ = None

3.2 init

init代码如下。

def __init__(self, channel, exchange=None, routing_key=None,

serializer=None, auto_declare=None, compression=None,

on_return=None):

self._channel = channel

self.exchange = exchange

self.routing_key = routing_key or self.routing_key

self.serializer = serializer or self.serializer

self.compression = compression or self.compression

self.on_return = on_return or self.on_return

self._channel_promise = None

if self.exchange is None:

self.exchange = Exchange('')

if auto_declare is not None:

self.auto_declare = auto_declare

if self._channel:

self.revive(self._channel)

3.2.1 转换channel

这里有个重要转换。

  • 最开始是把输入参数 Connection 赋值到 self._channel。
  • 然后 revive 方法做了转换为 channel,即 self._channel 最终是 channel 类型。

但是 exchange 依然没有意义,是 direct 类型。

代码如下:

def revive(self, channel):

"""Revive the producer after connection loss."""

if is_connection(channel):

connection = channel

self.__connection__ = connection

channel = ChannelPromise(lambda: connection.default_channel)

if isinstance(channel, ChannelPromise):

self._channel = channel

self.exchange = self.exchange(channel)

else:

# Channel already concrete

self._channel = channel

if self.on_return:

self._channel.events['basic_return'].add(self.on_return)

self.exchange = self.exchange(channel)

此时变量为:

producer = {Producer}

auto_declare = {bool} True

channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>

compression = {NoneType} None

connection = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>

exchange = {Exchange} Exchange ''(direct)

on_return = {NoneType} None

routing_key = {str} ''

serializer = {NoneType} None

逻辑如图:

+----------------------+               +-------------------+

| Producer | | Channel |

| | | | +-----------------------------------------------------------+

| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |

| channel +------------------> | | +-----------------------------------------------------------+

| | | pool |

| exchange | +---------> | | <------------------------------------------------------------+

| | | | | |

| connection | | +----> | connection +---------------+ |

| + | | | | | | |

+--+-------------------+ | | +-------------------+ | |

| | | | v |

| | | | +-------------------+ +---+-----------------+ +--------------------+ |

| | | | | Connection | | redis.Transport | | MultiChannelPoller | |

| +----------------------> | | | | | | |

| | | | | | | | _channels +--------+

| | | | | | cycle +------------> | _fd_to_chan |

| | | | transport +---------> | | | _chan_to_sock |

| +-------->+ | | | | | +------+ poller |

| | | +-------------------+ +---------------------+ | | after_read |

| | | | | |

| | | | +--------------------+

| | | +------------------+ +---------------+

| | | | Hub | |

| | | | | v

| | | | | +------+------+

| | | | poller +---------------> | _poll |

| publish | | | | | | +-------+

+--------------------------------+ | | | _poller+---------> | poll |

| | | +------------------+ | | +-------+

| | | +-------------+

+-------------------+ | +-----> +----------------+

| Queue | | | | Exchange |

| _channel | +---------+ | |

| | | |

| exchange +--------------------> | channel |

| | | |

| | | |

+-------------------+ +----------------+

手机如图:

[源码分析] 消息队列 Kombu 之 Producer

0x04 发送

发送消息是通过producer.publish完成。

def send_message(conn):

producer = Producer(conn)

producer.publish('hello world', exchange=exchange, routing_key='asynt')

print('message sent')

此时传入exchange作为参数。原来如果没有 Exchange,是可以在这里进行补救

producer.publish继续调用到如下,可以看到分为两步:

  • 调用channel的组装消息函数prepare_message
  • 调用channel的发送消息basic_publish

因此,最终发送消息还是通过channel完成。

def _publish(self, body, priority, content_type, content_encoding,

headers, properties, routing_key, mandatory,

immediate, exchange, declare):

channel = self.channel

message = channel.prepare_message(

body, priority, content_type,

content_encoding, headers, properties,

)

if declare:

maybe_declare = self.maybe_declare

[maybe_declare(entity) for entity in declare]

# handle autogenerated queue names for reply_to

reply_to = properties.get('reply_to')

if isinstance(reply_to, Queue):

properties['reply_to'] = reply_to.name

return channel.basic_publish(

message,

exchange=exchange, routing_key=routing_key,

mandatory=mandatory, immediate=immediate,

)

4.1 组装消息 in channel

channel 的组装消息函数prepare_message完成组装功能,基本上是为消息添加各种属性。

def prepare_message(self, body, priority=None, content_type=None,

content_encoding=None, headers=None, properties=None):

"""Prepare message data."""

properties = properties or {}

properties.setdefault('delivery_info', {})

properties.setdefault('priority', priority or self.default_priority)

return {'body': body,

'content-encoding': content_encoding,

'content-type': content_type,

'headers': headers or {},

'properties': properties or {}}

消息如下:

message = {dict: 5}

'body' = {str} 'aGVsbG8gd29ybGQ='

'content-encoding' = {str} 'utf-8'

'content-type' = {str} 'text/plain'

'headers' = {dict: 0} {}

__len__ = {int} 0

'properties' = {dict: 5}

'delivery_mode' = {int} 2

'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'}

'priority' = {int} 0

'body_encoding' = {str} 'base64'

'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a'

__len__ = {int} 5

4.2 发送消息 in channel

channel的发送消息basic_publish完成发送功能。此时使用了传入的参数exchange。

发送消息basic_publish方法是调用_put方法:

def basic_publish(self, message, exchange, routing_key, **kwargs):

"""Publish message."""

self._inplace_augment_message(message, exchange, routing_key)

if exchange:

return self.typeof(exchange).deliver(

message, exchange, routing_key, **kwargs

)

# anon exchange: routing_key is the destination queue

return self._put(routing_key, message, **kwargs)

4.3 deliver in exchange

self.typeof(exchange).deliver代码接着来到exchange。本文是DirectExchange。

注意,这里用到了self.channel._put。就是Exchange的成员变量channel。

class DirectExchange(ExchangeType):

"""Direct exchange.

The `direct` exchange routes based on exact routing keys.

"""

type = 'direct'

def lookup(self, table, exchange, routing_key, default):

return {

queue for rkey, _, queue in table

if rkey == routing_key

}

def deliver(self, message, exchange, routing_key, **kwargs):

_lookup = self.channel._lookup

_put = self.channel._put

for queue in _lookup(exchange, routing_key):

_put(queue, message, **kwargs)

4.4 binding 转换

我们知道,Exchange的作用只是将发送的 routing_key 转化为 queue 的名字。这样发送就知道发到哪个 queue

因此依据_lookup方法得到对应的queue

def _lookup(self, exchange, routing_key, default=None):

"""Find all queues matching `routing_key` for the given `exchange`.

Returns:

str: queue name -- must return the string `default`

if no queues matched.

"""

if default is None:

default = self.deadletter_queue

if not exchange: # anon exchange

return [routing_key or default]

try:

R = self.typeof(exchange).lookup(

self.get_table(exchange),

exchange, routing_key, default,

)

except KeyError:

R = []

if not R and default is not None:

warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(

exchange=exchange, routing_key=routing_key)),

)

self._new_queue(default)

R = [default]

return R

此处具体逻辑为:

第一,调用到channel的方法。这里的 exchange 名字为 asynt_exchange。

def get_table(self, exchange):

key = self.keyprefix_queue % exchange

with self.conn_or_acquire() as client:

values = client.smembers(key)

if not values:

raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))

return [tuple(bytes_to_str(val).split(self.sep)) for val in values]

我们看看Redis内容,发现集合内容如下:

127.0.0.1:6379> smembers _kombu.binding.asynt_exchange

1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"

第二,因此得到对应binding为:

{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}

即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。

逻辑如下:

                                  +---------------------------------+

| exchange |

| |

1 routing_key x | |

+----------+ | | +------------+

| Producer | +-----------------> | routing_key x ---> queue x | | Consumer |

+--------+-+ | | +------------+

| | routing_key y ---> queue y |

| | | ^

| | routing_key z ---> queue z | |

| | | |

| +---------------------------------+ |

| |

| |

| |

| |

| |

| |

| |

| |

| +-----------+ |

| 2 message | | 3 message |

+-------------------------------> | queue X | +--------------------+

| |

+-----------+

4.5 _put in channel

channel的_put 方法被用来继续处理,可以看到其最终调用到了client.lpush。

client为:

Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

代码为:

def _put(self, queue, message, **kwargs):

"""Deliver message."""

pri = self._get_message_priority(message, reverse=False)

with self.conn_or_acquire() as client:

client.lpush(self._q_for_pri(queue, pri), dumps(message))

redis怎么区别不同的queue?

实际是每个 queue 被赋予一个字符串 name,这个 name 就是 redis 对应的 list 的 key。知道应该向哪个 list 放消息,后续就是向此 list 中 lpush 消息。

如下方法完成转换功能。

def _q_for_pri(self, queue, pri):

pri = self.priority(pri)

if pri:

return f"{queue}{self.sep}{pri}"

return queue

现在发消息之后,redis内容如下,我们可以看出来,消息作为list 的item,放入到之中。

127.0.0.1:6379> lrange asynt_queue 0 -1

1) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}"

127.0.0.1:6379>

0x05 总结

现在我们总结如下:

  • Producers: 发送消息的抽象类;
  • Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息;
  • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列;
  • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
  • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连,就是真实redis连接;

于是逻辑链已经形成,大约是这样的:

  • Producer的publish方法接受参数Exchange,于是就发送消息到此Exchange;
  • Producer调用channel的组装消息函数prepare_message为消息添加各种属性;
  • Producer调用channel的发送消息basic_publish发送消息,此时使用了传入的参数exchange。
  • basic_publish方法调用exchange.deliver(exchange, routing_key)来发送消息;
  • Exchange中有成员变量Channel,也有成员变量Queues,每个queue对应一个routing_key;
  • deliver使用_lookup方法依据key得到对应的queue;
  • deliver使用Exchange成员变量Channel的_put方法来向queue中投放消息;
  • Channel拿到自己的redis连接池,即client为Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>;于是可以基于此进行redis操作;
  • redis怎么区别不同的queue,实际是每个queue被赋予一个字符串name,这就是redis对应的list的key;
  • 既然得到了名字为queue的list,则向此list中lpush消息。
  • Consumer去Queue取消息;

动态逻辑如下:

       +------------+                        +------------+               +------------+      +-----------------------+

| producer | | channel | | exchange | | Redis<ConnectionPool> |

+---+--------+ +----+-------+ +-------+----+ +----------+------------+

| | | |

| | | |

publish('', exchange, routing_key) | | |

| | | |

| prepare_message | | |

| | | |

| +----------------------------------> | | |

| | | |

| basic_publish (exchange, routing_key)| | |

| | | |

| +----------------------------------> | | |

| | | |

| | deliver(exchange, routing_key)| |

| | | |

| +-----------------------------> | |

| | | |

| | | |

| | _lookup(exchange, routing_key) |

| | | |

| | | |

| | _put(queue, message) | |

| v | |

| | <---------------------------+ | |

| | | |

| _q_for_pri(queue, pri) | |

| + | |

v | | |

| | client.lpush | |

| | +--------------------------------------------------> |

| | | |

v v v v

手机如下:

[源码分析] 消息队列 Kombu 之 Producer

0xFF 参考

celery 7 优秀开源项目kombu源码分析之registry和entrypoint

放弃pika,选择kombu

kombu消息框架<二>

AMQP中的概念

AMQP的基本概念

深入理解AMQP协议

kombu和消息队列总结

关于epoll版服务器的理解(Python实现)

以上是 消息队列Kombu之Producer源码分析 的全部内容, 来源链接: utcz.com/a/122470.html

回到顶部