打印响应部分完成Python的异步事件循环,同时还完成任务的响应

我与中信高科工作后,但我有点卡住了。我打电话给3个不同的API,每个都有自己的响应时间。打印响应部分完成Python的异步事件循环,同时还完成任务的响应

我想创建一个超时功能,它为每个任务返回一个可接受的时间。但是如果时间任务在可接受的时间内没有完成,我想返回部分数据,因为我不需要一个完整的数据集,速度更关注。

不过,我想保持未完成的任务工作,直到完成(即请求API数据插入到一个Postgres数据库。

我想知道,如果我们能做到这一点,而无需使用某种调度到保持背景中运行的任务。

回答:

但如果时间任务不是可接受的时间内完成,我想 返回部分数据,因为我并不需要一个完整的数据集和速度 更多的是关注的焦点。

但是,我想保持未完成的任务,直到完成

所以其他任务是独立于超时任务的状态,对吗?如果我正确地理解了你,你只想用他们自己的超时运行3 asyncio.Task,并在最后汇总他们的结果。

唯一可能的问题,我看到的是“想返回部分数据”,因为它很可能的事情如何组织有所不同,但我们可能只需要通过这个“部分数据”里面的任务上调超时被取消例外。

这里的小原型:

import asyncio 

class PartialData(Exception):

def __init__(self, data):

super().__init__()

self.data = data

async def api_job(i):

data = 'job {i}:'.format(i=i)

try:

await asyncio.sleep(1)

data += ' step 1,'

await asyncio.sleep(2)

data += ' step 2,'

await asyncio.sleep(2)

data += ' step 3.'

except asyncio.CancelledError as exc:

raise PartialData(data) # Pass partial data to outer code with our exception.

else:

return data

async def api_task(i, timeout):

"""Wrapper for api_job to run it with timeout and retrieve it's partial data on timeout."""

t = asyncio.ensure_future(api_job(i))

try:

await asyncio.wait_for(t, timeout)

except asyncio.TimeoutError:

try:

await t

except PartialData as exc:

return exc.data # retrieve partial data on timeout and return it.

else:

return t.result()

async def main():

# Run 3 jobs with different timeouts:

results = await asyncio.gather(

api_task(1, timeout=2),

api_task(2, timeout=4),

api_task(3, timeout=6),

)

# Print results including "partial data":

for res in results:

print(res)

if __name__ == '__main__':

loop = asyncio.get_event_loop()

try:

loop.run_until_complete(main())

finally:

loop.run_until_complete(loop.shutdown_asyncgens())

loop.close()

输出:

job 1: step 1, 

job 2: step 1, step 2,

job 3: step 1, step 2, step 3.

(你可以看到前两个作业完成了超时和检索他们的DATAS中的一部分)

UPD:

复杂的例子包含可能的解决方案不同的事件:

import asyncio 

from contextlib import suppress

async def stock1(_):

await asyncio.sleep(1)

return 'stock1 res'

async def stock2(exception_in_2):

await asyncio.sleep(1)

if exception_in_2:

raise ValueError('Exception in stock2!')

await asyncio.sleep(1)

return 'stock2 res'

async def stock3(_):

await asyncio.sleep(3)

return 'stock3 res'

async def main():

# Vary this values to see different situations:

timeout = 2.5

exception_in_2 = False

# To run all three stocks just create tasks for them:

tasks = [

asyncio.ensure_future(s(exception_in_2))

for s

in (stock1, stock2, stock3)

]

# Now we just wait until one of this possible situations happened:

# 1) Everything done

# 2) Exception occured in one of tasks

# 3) Timeout occured and at least two tasks ready

# 4) Timeout occured and less than two tasks ready

# (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait)

await asyncio.wait(

tasks,

timeout=timeout,

return_when=asyncio.FIRST_EXCEPTION

)

is_success = all(t.done() and not t.exception() for t in tasks)

is_exception = any(t.done() and t.exception() for t in tasks)

is_good_timeout = \

not is_success and \

not is_exception and \

sum(t.done() for t in tasks) >= 2

is_bad_timeout = \

not is_success and \

not is_exception and \

sum(t.done() for t in tasks) < 2

# If success, just print all results:

if is_success:

print('All done before timeout:')

for t in tasks:

print(t.result())

# If timeout, but at least two done,

# print it leaving pending task to be executing.

# But note two important things:

# 1) You should guarantee pending task done before loop closed

# 2) What if pending task will finish with error, is it ok?

elif is_good_timeout:

print('Timeout, but enought tasks done:')

for t in tasks:

if t.done():

print(t.result())

# Timeout and not enought tasks done,

# let's just cancel all hanging:

elif is_bad_timeout:

await cancel_and_retrieve(tasks)

raise RuntimeError('Timeout and not enought tasks done') # You probably want indicate fail

# If any of tasks is finished with an exception,

# we should probably cancel unfinished tasks,

# await all tasks done and retrive all exceptions to prevent warnings

# (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed)

elif is_exception:

await cancel_and_retrieve(tasks)

raise RuntimeError('Exception in one of tasks') # You probably want indicate fail

async def cancel_and_retrieve(tasks):

"""

Cancel all pending tasks, retrieve all exceptions

(https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed)

It's cleanup function if we don't want task being continued.

"""

for t in tasks:

if not t.done():

t.cancel()

await asyncio.wait(

tasks,

return_when=asyncio.ALL_COMPLETED

)

for t in tasks:

with suppress(Exception):

await t

if __name__ == '__main__':

loop = asyncio.get_event_loop()

try:

loop.run_until_complete(main())

finally:

# If some tasks still pending (is_good_timeout case),

# let's kill them:

loop.run_until_complete(

cancel_and_retrieve(asyncio.Task.all_tasks())

)

loop.run_until_complete(loop.shutdown_asyncgens())

loop.close()

以上是 打印响应部分完成Python的异步事件循环,同时还完成任务的响应 的全部内容, 来源链接: utcz.com/qa/267333.html

回到顶部