A channel that publishes messages to a given 'name' on its own Transport.
The default name is "eggai.channel".
Connection is established lazily on the first publish or subscription.
Source code in eggai/channel.py
| class Channel:
"""
A channel that publishes messages to a given 'name' on its own Transport.
The default name is "eggai.channel".
Connection is established lazily on the first publish or subscription.
"""
def __init__(self, name: str = "eggai.channel", transport: Optional[Transport] = None):
"""
Initialize a Channel instance.
Args:
name (str): The channel (topic) name. Defaults to "eggai.channel".
transport (Optional[Transport]): A concrete transport instance. If None, a default transport is used.
"""
self._name = name
if transport is None:
self._transport = get_default_transport()
else:
self._transport = transport
self._connected = False
self._stop_registered = False
def get_name(self) -> str:
"""
Get the channel name.
Returns:
str: The channel name.
"""
return self._name
async def _ensure_connected(self):
if not self._connected:
await self._transport.connect()
self._connected = True
if not self._stop_registered:
await eggai_register_stop(self.stop)
self._stop_registered = True
async def publish(self, message: Union[Dict[str, Any], BaseModel]):
"""
Publish a message to the channel. Establishes a connection if not already connected.
Args:
message (Dict[str, Any]): The message payload to publish.
"""
await self._ensure_connected()
await self._transport.publish(self._name, message)
async def subscribe(self, callback: Callable[[Dict[str, Any]], "asyncio.Future"]):
"""
Subscribe to the channel by registering a callback to be invoked when messages are received.
Args:
callback (Callable[[Dict[str, Any]], "asyncio.Future"]): The callback to invoke on new messages.
"""
await self._transport.subscribe(self._name, callback)
await self._ensure_connected()
async def stop(self):
"""
Disconnects the channel's transport if connected.
"""
if self._connected:
await self._transport.disconnect()
self._connected = False
|
__init__(name='eggai.channel', transport=None)
Initialize a Channel instance.
Parameters:
Name |
Type |
Description |
Default |
name
|
str
|
The channel (topic) name. Defaults to "eggai.channel".
|
'eggai.channel'
|
transport
|
Optional[Transport]
|
A concrete transport instance. If None, a default transport is used.
|
None
|
Source code in eggai/channel.py
| def __init__(self, name: str = "eggai.channel", transport: Optional[Transport] = None):
"""
Initialize a Channel instance.
Args:
name (str): The channel (topic) name. Defaults to "eggai.channel".
transport (Optional[Transport]): A concrete transport instance. If None, a default transport is used.
"""
self._name = name
if transport is None:
self._transport = get_default_transport()
else:
self._transport = transport
self._connected = False
self._stop_registered = False
|
get_name()
Get the channel name.
Returns:
Name | Type |
Description |
str |
str
|
|
Source code in eggai/channel.py
| def get_name(self) -> str:
"""
Get the channel name.
Returns:
str: The channel name.
"""
return self._name
|
publish(message)
async
Publish a message to the channel. Establishes a connection if not already connected.
Parameters:
Name |
Type |
Description |
Default |
message
|
Dict[str, Any]
|
The message payload to publish.
|
required
|
Source code in eggai/channel.py
| async def publish(self, message: Union[Dict[str, Any], BaseModel]):
"""
Publish a message to the channel. Establishes a connection if not already connected.
Args:
message (Dict[str, Any]): The message payload to publish.
"""
await self._ensure_connected()
await self._transport.publish(self._name, message)
|
stop()
async
Disconnects the channel's transport if connected.
Source code in eggai/channel.py
| async def stop(self):
"""
Disconnects the channel's transport if connected.
"""
if self._connected:
await self._transport.disconnect()
self._connected = False
|
subscribe(callback)
async
Subscribe to the channel by registering a callback to be invoked when messages are received.
Parameters:
Name |
Type |
Description |
Default |
callback
|
Callable[[Dict[str, Any]], Future]
|
The callback to invoke on new messages.
|
required
|
Source code in eggai/channel.py
| async def subscribe(self, callback: Callable[[Dict[str, Any]], "asyncio.Future"]):
"""
Subscribe to the channel by registering a callback to be invoked when messages are received.
Args:
callback (Callable[[Dict[str, Any]], "asyncio.Future"]): The callback to invoke on new messages.
"""
await self._transport.subscribe(self._name, callback)
await self._ensure_connected()
|