为什么 kombu 的 ConsumerMixin 消费阻塞了?
from kombu.mixins import ConsumerMixinfrom kombu import Connection, Exchange, Queue
from loguru import logger
class MyConsumer(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection
    def get_consumers(self, Consumer, channel):
        print('创建消费者 start')
        queue_name = 'evt-ye.events-take--dna_create_service.auth'
        exchange_name = 'ye.events'
        routing_key = 'take'
        exchange = Exchange(exchange_name, type='topic')
        queue = Queue(
            queue_name, exchange=exchange,
            routing_key=routing_key,
            queue_arguments={'x-max-priority': 10}
        )
        # 创建一个消费者,并设置预取消息数量为10
        consumer = Consumer(
            queues=[queue], callbacks=[self.on_message],
            prefetch_count=10
        )
        print('创建消费者 down')
        return [consumer]
    def on_message(self, body, message):
        logger.debug(f"Received message: {body}")
with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
    consumer = MyConsumer(conn)
    consumer.run()
上面的代码运行后输出
╰─➤  python -u "/Users/ponponon/Desktop/code/me/pon_example/demo.py"                                                                                                                                                                                                      130 ↵创建消费者 start
创建消费者 down
2023-07-07 14:21:29.154 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.156 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
然后就没有然后了,程序没有退出,一直阻塞着

从 rabbitmq 的监控面板看,也一直出于阻塞状态

用 wireshark 抓包看,也没有回复 ack
为什么 ?
回答:
def on_message(self, body, message):    logger.debug(f"Received message: {body}")
    message.ack()  # acknowledge the message
以上是 为什么 kombu 的 ConsumerMixin 消费阻塞了? 的全部内容, 来源链接: utcz.com/p/938939.html


