从Flask路由进行Python异步调用

我想在每次执行Flask路由时执行一个异步函数。为什么abar函数从不执行?

import asyncio

from flask import Flask

async def abar(a):

print(a)

loop = asyncio.get_event_loop()

app = Flask(__name__)

@app.route("/")

def notify():

asyncio.ensure_future(abar("abar"), loop=loop)

return "OK"

if __name__ == "__main__":

app.run(debug=False, use_reloader=False)

loop.run_forever()

我还尝试将阻塞调用放在单独的线程中。但是它仍然没有调用该abar函数。

import asyncio

from threading import Thread

from flask import Flask

async def abar(a):

print(a)

app = Flask(__name__)

def start_worker(loop):

asyncio.set_event_loop(loop)

try:

loop.run_forever()

finally:

loop.close()

worker_loop = asyncio.new_event_loop()

worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")

def notify():

asyncio.ensure_future(abar("abar"), loop=worker_loop)

return "OK"

if __name__ == "__main__":

worker.start()

app.run(debug=False, use_reloader=False)

回答:

你可以将一些异步功能整合到Flask应用中,而不必完全将其转换为异步。

import asyncio

from flask import Flask

async def abar(a):

print(a)

loop = asyncio.get_event_loop()

app = Flask(__name__)

@app.route("/")

def notify():

loop.run_until_complete(abar("abar"))

return "OK"

if __name__ == "__main__":

app.run(debug=False, use_reloader=False)

这将阻止Flask响应,直到异步函数返回为止,但是它仍然允许你做一些聪明的事情。我已经使用此模式使用aiohttp并行执行许多外部请求,然后在完成它们之后,我回到传统的flask中进行数据处理和模板渲染。

import aiohttp

import asyncio

import async_timeout

from flask import Flask

loop = asyncio.get_event_loop()

app = Flask(__name__)

async def fetch(url):

async with aiohttp.ClientSession() as session, async_timeout.timeout(10):

async with session.get(url) as response:

return await response.text()

def fight(responses):

return "Why can't we all just get along?"

@app.route("/")

def index():

# perform multiple async requests concurrently

responses = loop.run_until_complete(asyncio.gather(

fetch("https://google.com/"),

fetch("https://bing.com/"),

fetch("https://duckduckgo.com"),

fetch("http://www.dogpile.com"),

))

# do something with the results

return fight(responses)

if __name__ == "__main__":

app.run(debug=False, use_reloader=False)

以上是 从Flask路由进行Python异步调用 的全部内容, 来源链接: utcz.com/qa/434780.html

回到顶部