如何从django-celery 3任务发送Channel 2.x组消息?

我需要推迟发送频道消息。这是我的代码:

# consumers.py

class ChatConsumer(WebsocketConsumer):

def chat_message(self, event):

self.send(text_data=json.dumps(event['message']))

def connect(self):

self.channel_layer.group_add(self.room_name, self.channel_name)

self.accept()

def receive(self, text_data=None, bytes_data=None):

send_message_task.apply_async(

args=(

self.room_name,

{'type': 'chat_message',

'message': 'the message'}

),

countdown=10

)

# tasks.py

@shared_task

def send_message_task(room_name, message):

layer = get_channel_layer()

layer.group_send(room_name, message)

该任务正在执行,我看不到任何错误,但消息未发送。仅当我从消费者类方法发送时才有效。

我也尝试使用AsyncWebsocketConsumer并使用AsyncToSync(layer.group_send)发送。错误为“您不能在与异步事件循环相同的线程中使用AsyncToSync-

请直接等待异步功能。”

然后,我尝试将send_message_task声明为异步并使用await。再也没有发生任何事情(没有错误),并且我不确定该任务是否已执行。

以下是版本:

Django==1.11.13

redis==2.10.5

django-celery==3.2.2

channels==2.1.2

channels_redis==2.2.1

设定:

REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')

BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)

CHANNEL_LAYERS = {

"default": {

"BACKEND": "channels_redis.core.RedisChannelLayer",

"CONFIG": {

"hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],

},

},

}

有任何想法吗?

刚刚发现redis通道层已恢复,但是没有调用而是跳过了group_send方法。

使用AsyncToSync(layer.group_send)从控制台工作进行发送。不带调用任务apply_async也可以。但是,将其运行会apply_async导致错误You

cannot use AsyncToSync in the same thread as an async event loop - just await

the async function directly。将任务定义为异步并使用await当然也会破坏一切。

回答:

也许这不是开始问题的直接答案,但这可能会有所帮助。如果出现异常“您不能在与异步事件循环相同的线程中使用AsyncToSync-

请直接等待异步函数”,那么您可能会做出这样的选择:

  1. 事件循环在某处创建
  2. 一些ASYNC代码已启动
  3. 从ASYNC代码中调用了一些SYNC代码
  4. SYNC代码正在尝试使用AsyncToSync调用ASYNC代码,以防止出现这种情况

似乎AsyncToSync检测到外部事件循环并做出不干扰它的决定。

解决方案是将您的异步调用直接包含在外部事件循环中。下面是示例代码,但是最好是检查您的情况,并确定外部循环正在运行…

loop = asyncio.get_event_loop()

loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))

以上是 如何从django-celery 3任务发送Channel 2.x组消息? 的全部内容, 来源链接: utcz.com/qa/414841.html

回到顶部