为什么 kombu 的 ConsumerMixin 消费阻塞了?

from kombu.mixins import ConsumerMixin

from kombu import Connection, Exchange, Queue

from loguru import logger

class MyConsumer(ConsumerMixin):

def __init__(self, connection):

self.connection = connection

def get_consumers(self, Consumer, channel):

print('创建消费者 start')

queue_name = 'evt-ye.events-take--dna_create_service.auth'

exchange_name = 'ye.events'

routing_key = 'take'

exchange = Exchange(exchange_name, type='topic')

queue = Queue(

queue_name, exchange=exchange,

routing_key=routing_key,

queue_arguments={'x-max-priority': 10}

)

# 创建一个消费者,并设置预取消息数量为10

consumer = Consumer(

queues=[queue], callbacks=[self.on_message],

prefetch_count=10

)

print('创建消费者 down')

return [consumer]

def on_message(self, body, message):

logger.debug(f"Received message: {body}")

with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:

consumer = MyConsumer(conn)

consumer.run()

上面的代码运行后输出

╰─➤  python -u "/Users/ponponon/Desktop/code/me/pon_example/demo.py"                                                                                                                                                                                                      130 ↵

创建消费者 start

创建消费者 down

2023-07-07 14:21:29.154 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

2023-07-07 14:21:29.156 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈

然后就没有然后了,程序没有退出,一直阻塞着

为什么 kombu 的 ConsumerMixin 消费阻塞了?

从 rabbitmq 的监控面板看,也一直出于阻塞状态

为什么 kombu 的 ConsumerMixin 消费阻塞了?

用 wireshark 抓包看,也没有回复 ack

为什么 ?


回答:

def on_message(self, body, message):

logger.debug(f"Received message: {body}")

message.ack() # acknowledge the message

以上是 为什么 kombu 的 ConsumerMixin 消费阻塞了? 的全部内容, 来源链接: utcz.com/p/938939.html

回到顶部