如何动态的将一个future添加到一个正在运行的asyncio模块的事件循环中?

自学asyncio模块中,问题多多~
python3.5 的 asyncio 模块中,一般都是通过 事件循环对象 的 run_until_complete(future) 来异步地运行多个协程的,参数 future 大都是由 asyncio.wait() 或 asyncio.gather() 将多个本地协程函数封装成一个future,但问题是这 “ 多个本地协程函数 ” 是一次写死的,可不可以在 run_until_complete 运行过程中动态添加呢?或者用其他方法来达成类似效果?

比如在处理socket模块那典型的C/S模型的服务端的时候 :

#! /usr/bin/env python3

# -*- coding:utf-8 -*-

# 一个未完成的 异步echo-server

import asyncio,socket,queue

server = socket.socket()

server.bind(('0.0.0.0',12345))

server.listen()

client_queue = queue.Queue()

async def accept():

return server.accept()

async def get_client_socket():

while 1:

client, addr = await accept()

print('%s has connected'%str(addr))

client_queue.put((client,addr))

async def send(sock,data):

sock.send(data)

async def recv(sock):

return sock.recv(1024)

async def interactive():

client, addr = client_queue.get()

while 1:

receive = await recv(client)

print('Received %r from %r'%(receive,addr))

await send(client,receive)

if __name__ == '__main__':

loop = asyncio.get_event_loop()

'''

如果写

loop.run_until_complete(asyncio.gather(

get_client_socket(),

interactive()

))

肯定不对...

到底该怎么实现“server 不断生产新的负责与客户端连接的socket”的get_client_socket协程和“每个‘负责与客户端连接的socket’ 与客户端交互”的interactive协程之间的并行呢?

'''

如上例,要如何实现
“ server 不断生成新的负责与客户端连接的 socket” 的 get_client_socket协程

“每个‘负责与客户端连接的socket’ 与客户端交互”的 interactive协程
之间的异步并行呢?


看了某大神的文章(http://python.jobbole.com/873...,试着修改了下

#! /usr/bin/env python3

# -*- coding:utf-8 -*-

# 一个有毛病的 异步echo-server

import asyncio,threading,socket

server = socket.socket()

server.bind(('127.0.0.1',12345))

server.listen()

async def recv_data(conn):

return conn.recv(1024)

async def send_data(conn,data):

conn.send(data)

async def interactive(conn,addr):

while 1:

receive = await recv_data(conn)

print('Received %s from %s'%(receive.decode('utf-8'),repr(addr)))

await send_data(conn,receive)

def start_loop(loop):

asyncio.set_event_loop(loop)

loop.run_forever()

new_loop = asyncio.new_event_loop()

t = threading.Thread(target=start_loop,args=(new_loop,))

t.start()

while 1:

conn,addr = server.accept()

print('%s has connected'%repr(addr))

asyncio.run_coroutine_threadsafe(interactive(conn,addr),new_loop)

print('New_loop\'s tasks num:',len(asyncio.Task.all_tasks(new_loop)))

简化版的客户端:

#! /usr/bin/env python3

# -*- coding:utf-8 -*-

# 简化版 echo-client

import socket

client = socket.socket()

client.connect(('127.0.0.1',12345))

while 1:

msg = input('>> ').strip()

client.send(msg.encode())

receive = client.recv(1024)

print('Received:',receive.decode('utf-8'))

启动一个服务端,启动3个客户端,3个客户端分别向服务端发送消息,服务端的回显如下:图片描述

问题:
如图所示,为什么只有1个客户端可以与服务端交互?

回答:

看这里
查找
不同线程的事件循环
希望能帮到你

回答:

也是一个刚学的新手,不知道对不对。如果没理解错,楼主是和我一样想在async的函数在运行的时候往loop里动态添加新的Task。答案应该就是在楼上链接里面 “协程嵌套” 那章,在async的main里添加了新的Task


PS:发现答错了,协程嵌套的新协程和main不是同一级的协程,要等到所有新协程执行完才会回到main...看来只有楼上的办法了,在另一个线程里新增协程才不会await


PSS:尝试了下,似乎可行的方案是:在loop开始之前用asyncio.ensure_future()新增协程,紧接着loop.run_forever(),在协程里如果想添加新的协程,同样用asyncio.ensure_future()新增协程,因为没有使用await,所以这样就不会挂起新协程了。
如果想停掉loop,那就把loop传入到协程内,调用loop.stop()

回答:

正好也遇到了这个问题, 花了老长时间解决, 希望对其他人有所帮助.

关于楼主的echo server只能与单个客户端通信的问题, 主要有两点:

  1. 设置套接字为非阻塞模式
  2. 自定义的accept, recvsend协程函数应该还是差点东西.

我后来找到了这篇文章从 asyncio 简单实现看异步是如何工作的, 它用loop.create_task()函数动态添加任务到事件循环, 然后用asyncio内置的底层异步协程函数sock_accept, sock_recv, sock_send完成echo server.

这是我写的小demo 协程模型回显服务器, 如果觉得对你帮助, 顺便帮我点个星, 谢谢.

回答:

其实是动态的向一个已经在运行的LOOP中怎么添加任务的问题吧.
我这个例子你试试看.
在主线程中运行一个LOOP, 然后开10个子线程,获取主线程中的 queue 的数据.
当然你也可以反着写.
asyncio.run_coroutine_threadsafe
loop.call_soon_threadsafe

你值得拥有

PS :PYTHON3.7

import asyncio

import threading

import random

async def fn(name,qu):

while True:

print(name,"ready to work")

r=asyncio.run_coroutine_threadsafe(qu.get(),qu._loop)

rs=r.result()

print(name,"get work,working",rs)

if rs=="over":

asyncio.run_coroutine_threadsafe(qu.put("over"),qu._loop)

print(name,"over")

await asyncio.sleep(random.randint(1,3))

print(name,"done,sleep")

await asyncio.sleep(random.randint(1,3))

print(name,"wake up")

if rs=="over":

break

def t(name,qu):

loop=asyncio.new_event_loop()

asyncio.set_event_loop(loop)

loop.run_until_complete(fn(name,qu))

loop=asyncio.new_event_loop()

asyncio.set_event_loop(loop)

qu=asyncio.Queue()

tl=[threading.Thread(target=t,args=(i,qu)) for i in range(5)]

for i in tl:

i.setDaemon(True)

i.start()

async def pt(qu):

for i in range(20):

await asyncio.sleep(0.8)

await qu.put(i)

print("work need to do",qu)

await qu.put("over")

await asyncio.sleep(5)

loop.run_until_complete(pt(qu))

loop.close()

回答:

import asyncio 

tq = asyncio.Queue()

async def forever():

while True:

task = await tq.get()

await task[0](task[1])

async def funcname(param):

a = await asyncio.sleep(1)

await tq.put((funcname,param))

print(a)

for i in range(100):

tq.put((funcname,tq))

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait([forever() for i in range(1000)]))

loop.close()

把协程加入一个已经开始运行的循环可以这样写,把协程加入队列,再由别的协程取出执行,如果需要返回值就把返回值放入另一个队列

以上是 如何动态的将一个future添加到一个正在运行的asyncio模块的事件循环中? 的全部内容, 来源链接: utcz.com/a/159751.html

回到顶部