Skip to content

Transport

Bases: ABC

Abstract base for any transport. It should manage publishing, subscribing, connecting, and disconnecting.

Source code in eggai/transport/base.py
class Transport(ABC):
    """
    Abstract base for any transport. It should manage publishing,
    subscribing, connecting, and disconnecting.
    """

    @abstractmethod
    async def connect(self):
        """
        Connect to the underlying system.
        """
        pass

    @abstractmethod
    async def disconnect(self):
        """
        Cleanly disconnect from the transport.
        """
        pass

    @abstractmethod
    async def publish(self, channel: str, message: Union[Dict[str, Any], BaseModel]):
        """
        Publish the given message to the channel.
        """
        pass

    @abstractmethod
    async def subscribe(self, channel: str, callback: Callable[[Dict[str, Any]], "asyncio.Future"], **kwargs) -> Callable:
        """
        Subscribe to a channel with the given callback, invoked on new messages.
        (No-op if a consumer doesn’t exist.)
        """
        pass

connect() abstractmethod async

Connect to the underlying system.

Source code in eggai/transport/base.py
@abstractmethod
async def connect(self):
    """
    Connect to the underlying system.
    """
    pass

disconnect() abstractmethod async

Cleanly disconnect from the transport.

Source code in eggai/transport/base.py
@abstractmethod
async def disconnect(self):
    """
    Cleanly disconnect from the transport.
    """
    pass

publish(channel, message) abstractmethod async

Publish the given message to the channel.

Source code in eggai/transport/base.py
@abstractmethod
async def publish(self, channel: str, message: Union[Dict[str, Any], BaseModel]):
    """
    Publish the given message to the channel.
    """
    pass

subscribe(channel, callback, **kwargs) abstractmethod async

Subscribe to a channel with the given callback, invoked on new messages. (No-op if a consumer doesn’t exist.)

Source code in eggai/transport/base.py
@abstractmethod
async def subscribe(self, channel: str, callback: Callable[[Dict[str, Any]], "asyncio.Future"], **kwargs) -> Callable:
    """
    Subscribe to a channel with the given callback, invoked on new messages.
    (No-op if a consumer doesn’t exist.)
    """
    pass