Skip to content

PyDrocsid.pubsub

PubSubChannel

Bases: Generic[PubSubArgs, PubSubResult]

Publish-Subscribe channel for inter cog communication

Source code in PyDrocsid/pubsub.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class PubSubChannel(Generic[PubSubArgs, PubSubResult]):
    """Publish-Subscribe channel for inter cog communication"""

    def __init__(self) -> None:
        self._subscriptions: list[Callable[PubSubArgs, Awaitable[PubSubResult | None]]] = []

    async def publish(self, *args: PubSubArgs.args, **kwargs: PubSubArgs.kwargs) -> list[PubSubResult]:
        """
        Publish a message to this channel. This will call all subscriptions and return
        a list of their return values (if they don't return None).
        """

        result = await asyncio.gather(*[sub(*args, **kwargs) for sub in self._subscriptions])
        return [r for r in result if r is not None]

    # calling the PubSubChannel object directly (like a function) also publishes a message
    __call__ = publish

    @property
    def subscribe(self) -> Type[Subscription[PubSubArgs, PubSubResult]]:
        """
        Decorator for async functions to register them as subscriptions of this channel.
        Can only be used on methods of Cog classes.
        """

        class Sub(Subscription[PubSubArgs, PubSubResult]):  # type: ignore
            channel = self

        return Sub

    def register(self, subscription: Subscription[PubSubArgs, PubSubResult]) -> None:
        self._subscriptions.append(subscription)

publish async

publish(*args: PubSubArgs.args, **kwargs: PubSubArgs.kwargs) -> list[PubSubResult]

Publish a message to this channel. This will call all subscriptions and return a list of their return values (if they don't return None).

Source code in PyDrocsid/pubsub.py
49
50
51
52
53
54
55
56
async def publish(self, *args: PubSubArgs.args, **kwargs: PubSubArgs.kwargs) -> list[PubSubResult]:
    """
    Publish a message to this channel. This will call all subscriptions and return
    a list of their return values (if they don't return None).
    """

    result = await asyncio.gather(*[sub(*args, **kwargs) for sub in self._subscriptions])
    return [r for r in result if r is not None]

subscribe property

subscribe() -> Type[Subscription[PubSubArgs, PubSubResult]]

Decorator for async functions to register them as subscriptions of this channel. Can only be used on methods of Cog classes.

Source code in PyDrocsid/pubsub.py
61
62
63
64
65
66
67
68
69
70
71
@property
def subscribe(self) -> Type[Subscription[PubSubArgs, PubSubResult]]:
    """
    Decorator for async functions to register them as subscriptions of this channel.
    Can only be used on methods of Cog classes.
    """

    class Sub(Subscription[PubSubArgs, PubSubResult]):  # type: ignore
        channel = self

    return Sub