终止挂起的redis pubsub.listen()线程
与此问题相关,我有以下代码可订阅redispubsub队列,并使用__init__中提供的处理程序将消息提供给处理它们的类:
from threading import Threadimport msgpack
class Subscriber(Thread):
def __init__(self, redis_connection, channel_name, handler):
super(Subscriber, self).__init__(name="Receiver")
self.connection = redis_connection
self.pubsub = self.connection.pubsub()
self.channel_name = channel_name
self.handler = handler
self.should_die = False
def start(self):
self.pubsub.subscribe(self.channel_name)
super(Subscriber, self).start()
def run(self):
for msg in self.pubsub.listen():
if self.should_die:
return
try:
data = msg["data"]
unpacked = msgpack.unpackb(data)
except TypeError:
# stop non-msgpacked, invalid, messages breaking stuff
# other validation happens in handler
continue
self.handler(unpacked)
def die(self):
self.should_die = True
在上面的链接问题中,请注意,pubsub.listen()
如果断开连接,则永不返回。因此,die()
尽管我的函数可以被调用,但它实际上不会导致线程终止,因为它挂在listen()
对线程内部的调用上run()
。
链接问题的可接受答案提到了黑客入侵redis-py的连接池。我真的不想这样做,并且有一个分支版本的redis-
py(至少在希望该修补程序被母版接受之前),但是无论如何我一直在看一下redis-py代码,并且不要立即看看将在哪里进行更改。
有谁知道如何彻底解决悬挂的redis-py listen()
电话吗?
我将直接使用哪些问题Thread._Thread__stop
?
回答:
这么多年后才将其关闭。它最终是redis库中的一个bug。我调试了它并提交了PR。它不应该再发生了。
以上是 终止挂起的redis pubsub.listen()线程 的全部内容, 来源链接: utcz.com/qa/428206.html