如何动态的将一个future添加到一个正在运行的asyncio模块的事件循环中?
自学asyncio模块中,问题多多~
python3.5 的 asyncio 模块中,一般都是通过 事件循环对象 的 run_until_complete(future) 来异步地运行多个协程的,参数 future 大都是由 asyncio.wait() 或 asyncio.gather() 将多个本地协程函数封装成一个future,但问题是这 “ 多个本地协程函数 ” 是一次写死的,可不可以在 run_until_complete 运行过程中动态添加呢?或者用其他方法来达成类似效果?
比如在处理socket模块那典型的C/S模型的服务端的时候 :
#! /usr/bin/env python3# -*- coding:utf-8 -*-
# 一个未完成的 异步echo-server
import asyncio,socket,queue
server = socket.socket()
server.bind(('0.0.0.0',12345))
server.listen()
client_queue = queue.Queue()
async def accept():
return server.accept()
async def get_client_socket():
while 1:
client, addr = await accept()
print('%s has connected'%str(addr))
client_queue.put((client,addr))
async def send(sock,data):
sock.send(data)
async def recv(sock):
return sock.recv(1024)
async def interactive():
client, addr = client_queue.get()
while 1:
receive = await recv(client)
print('Received %r from %r'%(receive,addr))
await send(client,receive)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
'''
如果写
loop.run_until_complete(asyncio.gather(
get_client_socket(),
interactive()
))
肯定不对...
到底该怎么实现“server 不断生产新的负责与客户端连接的socket”的get_client_socket协程和“每个‘负责与客户端连接的socket’ 与客户端交互”的interactive协程之间的并行呢?
'''
如上例,要如何实现
“ server 不断生成新的负责与客户端连接的 socket” 的 get_client_socket协程
和
“每个‘负责与客户端连接的socket’ 与客户端交互”的 interactive协程
之间的异步并行呢?
看了某大神的文章(http://python.jobbole.com/873...,试着修改了下
#! /usr/bin/env python3# -*- coding:utf-8 -*-
# 一个有毛病的 异步echo-server
import asyncio,threading,socket
server = socket.socket()
server.bind(('127.0.0.1',12345))
server.listen()
async def recv_data(conn):
return conn.recv(1024)
async def send_data(conn,data):
conn.send(data)
async def interactive(conn,addr):
while 1:
receive = await recv_data(conn)
print('Received %s from %s'%(receive.decode('utf-8'),repr(addr)))
await send_data(conn,receive)
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
new_loop = asyncio.new_event_loop()
t = threading.Thread(target=start_loop,args=(new_loop,))
t.start()
while 1:
conn,addr = server.accept()
print('%s has connected'%repr(addr))
asyncio.run_coroutine_threadsafe(interactive(conn,addr),new_loop)
print('New_loop\'s tasks num:',len(asyncio.Task.all_tasks(new_loop)))
简化版的客户端:
#! /usr/bin/env python3# -*- coding:utf-8 -*-
# 简化版 echo-client
import socket
client = socket.socket()
client.connect(('127.0.0.1',12345))
while 1:
msg = input('>> ').strip()
client.send(msg.encode())
receive = client.recv(1024)
print('Received:',receive.decode('utf-8'))
启动一个服务端,启动3个客户端,3个客户端分别向服务端发送消息,服务端的回显如下:
问题:
如图所示,为什么只有1个客户端可以与服务端交互?
回答:
看这里
查找不同线程的事件循环
希望能帮到你
回答:
也是一个刚学的新手,不知道对不对。如果没理解错,楼主是和我一样想在async的函数在运行的时候往loop里动态添加新的Task。答案应该就是在楼上链接里面 “协程嵌套” 那章,在async的main里添加了新的Task
PS:发现答错了,协程嵌套的新协程和main不是同一级的协程,要等到所有新协程执行完才会回到main...看来只有楼上的办法了,在另一个线程里新增协程才不会await
PSS:尝试了下,似乎可行的方案是:在loop开始之前用asyncio.ensure_future()新增协程,紧接着loop.run_forever(),在协程里如果想添加新的协程,同样用asyncio.ensure_future()新增协程,因为没有使用await,所以这样就不会挂起新协程了。
如果想停掉loop,那就把loop传入到协程内,调用loop.stop()
回答:
正好也遇到了这个问题, 花了老长时间解决, 希望对其他人有所帮助.
关于楼主的echo server
只能与单个客户端通信的问题, 主要有两点:
- 设置套接字为非阻塞模式
- 自定义的
accept
,recv
与send
协程函数应该还是差点东西.
我后来找到了这篇文章从 asyncio 简单实现看异步是如何工作的, 它用loop.create_task()
函数动态添加任务到事件循环, 然后用asyncio
内置的底层异步协程函数sock_accept
, sock_recv
, sock_send
完成echo server
.
这是我写的小demo 协程模型回显服务器, 如果觉得对你帮助, 顺便帮我点个星, 谢谢.
回答:
其实是动态的向一个已经在运行的LOOP中怎么添加任务的问题吧.
我这个例子你试试看.
在主线程中运行一个LOOP, 然后开10个子线程,获取主线程中的 queue 的数据.
当然你也可以反着写.
asyncio.run_coroutine_threadsafe
loop.call_soon_threadsafe
你值得拥有
PS :PYTHON3.7
import asyncioimport threading
import random
async def fn(name,qu):
while True:
print(name,"ready to work")
r=asyncio.run_coroutine_threadsafe(qu.get(),qu._loop)
rs=r.result()
print(name,"get work,working",rs)
if rs=="over":
asyncio.run_coroutine_threadsafe(qu.put("over"),qu._loop)
print(name,"over")
await asyncio.sleep(random.randint(1,3))
print(name,"done,sleep")
await asyncio.sleep(random.randint(1,3))
print(name,"wake up")
if rs=="over":
break
def t(name,qu):
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(fn(name,qu))
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
qu=asyncio.Queue()
tl=[threading.Thread(target=t,args=(i,qu)) for i in range(5)]
for i in tl:
i.setDaemon(True)
i.start()
async def pt(qu):
for i in range(20):
await asyncio.sleep(0.8)
await qu.put(i)
print("work need to do",qu)
await qu.put("over")
await asyncio.sleep(5)
loop.run_until_complete(pt(qu))
loop.close()
回答:
import asyncio tq = asyncio.Queue()
async def forever():
while True:
task = await tq.get()
await task[0](task[1])
async def funcname(param):
a = await asyncio.sleep(1)
await tq.put((funcname,param))
print(a)
for i in range(100):
tq.put((funcname,tq))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([forever() for i in range(1000)]))
loop.close()
把协程加入一个已经开始运行的循环可以这样写,把协程加入队列,再由别的协程取出执行,如果需要返回值就把返回值放入另一个队列
以上是 如何动态的将一个future添加到一个正在运行的asyncio模块的事件循环中? 的全部内容, 来源链接: utcz.com/a/159751.html