Skip to content

Transport

Bases: ABC

Abstract base for any transport. It should manage publishing, subscribing, connecting, and disconnecting.

Source code in eggai/transport/base.py
class Transport(ABC):
    """
    Abstract base for any transport. It should manage publishing,
    subscribing, connecting, and disconnecting.
    """

    @abstractmethod
    async def connect(self, group_id: Optional[str] = None):
        """
        Connect to the underlying system.
        If group_id is None, no consumer should be created (publish-only).
        """
        pass

    @abstractmethod
    async def disconnect(self):
        """
        Cleanly disconnect from the transport.
        """
        pass

    @abstractmethod
    async def publish(self, channel: str, message: Dict[str, Any]):
        """
        Publish the given message to the channel.
        """
        pass

    @abstractmethod
    async def subscribe(
            self, channel: str, callback: Callable[[Dict[str, Any]], "asyncio.Future"]
    ):
        """
        Subscribe to a channel with the given callback, invoked on new messages.
        (No-op if a consumer doesn’t exist.)
        """
        pass

connect(group_id=None) abstractmethod async

Connect to the underlying system. If group_id is None, no consumer should be created (publish-only).

Source code in eggai/transport/base.py
@abstractmethod
async def connect(self, group_id: Optional[str] = None):
    """
    Connect to the underlying system.
    If group_id is None, no consumer should be created (publish-only).
    """
    pass

disconnect() abstractmethod async

Cleanly disconnect from the transport.

Source code in eggai/transport/base.py
@abstractmethod
async def disconnect(self):
    """
    Cleanly disconnect from the transport.
    """
    pass

publish(channel, message) abstractmethod async

Publish the given message to the channel.

Source code in eggai/transport/base.py
@abstractmethod
async def publish(self, channel: str, message: Dict[str, Any]):
    """
    Publish the given message to the channel.
    """
    pass

subscribe(channel, callback) abstractmethod async

Subscribe to a channel with the given callback, invoked on new messages. (No-op if a consumer doesn’t exist.)

Source code in eggai/transport/base.py
@abstractmethod
async def subscribe(
        self, channel: str, callback: Callable[[Dict[str, Any]], "asyncio.Future"]
):
    """
    Subscribe to a channel with the given callback, invoked on new messages.
    (No-op if a consumer doesn’t exist.)
    """
    pass