Skip to content

PyDrocsid.async_thread

gather_any async

gather_any(*coroutines: Awaitable[T]) -> tuple[int, T]

Like asyncio.gather, but returns after the first coroutine is done.

Parameters:

  • coroutines

    the coroutines to run

Returns:

  • tuple[int, T]

    a tuple containing the index of the coroutine that has finished and its result

Source code in PyDrocsid/async_thread.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def gather_any(*coroutines: Awaitable[T]) -> tuple[int, T]:
    """
    Like asyncio.gather, but returns after the first coroutine is done.

    :param coroutines: the coroutines to run
    :return: a tuple containing the index of the coroutine that has finished and its result
    """

    event = Event()
    result: list[tuple[int, bool, T | Exception]] = []

    async def inner(i: int, coro: Awaitable[T]) -> None:
        # noinspection PyBroadException
        try:
            result.append((i, True, await coro))
        except Exception as e:
            result.append((i, False, e))
        event.set()

    tasks = [create_task(inner(i, c)) for i, c in enumerate(coroutines)]
    await event.wait()

    for task in tasks:
        if not task.done():
            task.cancel()

    idx, ok, value = result[0]
    if not ok:
        raise GatherAnyError(idx, cast(Exception, value))

    return idx, cast(T, value)

run_as_task

run_as_task(func: Callable[..., Coroutine[Any, Any, None]]) -> Callable[..., Awaitable[None]]

Decorator for async functions. Instead of calling the decorated function directly, this will create a task for it and return immediately.

Source code in PyDrocsid/async_thread.py
50
51
52
53
54
55
56
57
58
59
60
def run_as_task(func: Callable[..., Coroutine[Any, Any, None]]) -> Callable[..., Awaitable[None]]:
    """
    Decorator for async functions.
    Instead of calling the decorated function directly, this will create a task for it and return immediately.
    """

    @wraps(func)
    async def inner(*args: Any, **kwargs: Any) -> None:
        create_task(func(*args, **kwargs))

    return inner

semaphore_gather async

semaphore_gather(n: int, *tasks: Awaitable[T]) -> list[T]

Like asyncio.gather, but limited to n concurrent tasks.

Parameters:

  • n (int) –

    the maximum number of concurrent tasks

  • tasks

    the coroutines to run

Returns:

  • list[T]

    a list containing the results of all coroutines

Source code in PyDrocsid/async_thread.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
async def semaphore_gather(n: int, *tasks: Awaitable[T]) -> list[T]:
    """
    Like asyncio.gather, but limited to n concurrent tasks.

    :param n: the maximum number of concurrent tasks
    :param tasks: the coroutines to run
    :return: a list containing the results of all coroutines
    """

    semaphore = Semaphore(n)

    async def inner(t: Awaitable[T]) -> T:
        async with semaphore:
            return await t

    return list(await gather(*map(inner, tasks)))