KafkaTransport
Bases: Transport
Kafka-based transport layer adapted to use FastStream's KafkaBroker for message publishing and consumption.
This class serves as a transport mechanism that integrates with Kafka to allow message publishing and consumption. It uses the FastStream KafkaBroker to interact with Kafka, offering methods to connect, disconnect, publish messages to Kafka topics, and subscribe to Kafka topics.
Attributes:
Name | Type | Description |
---|---|---|
broker |
KafkaBroker
|
The KafkaBroker instance responsible for managing Kafka connections and messaging. |
Source code in eggai/transport/kafka.py
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
|
__init__(broker=None, bootstrap_servers='localhost:19092', **kwargs)
Initializes the KafkaTransport with an optional KafkaBroker or creates a new one with provided bootstrap servers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker
|
Optional[KafkaBroker]
|
An existing KafkaBroker instance to use. If not provided, a new instance will be created with the specified bootstrap servers and additional parameters. |
None
|
bootstrap_servers
|
str
|
The Kafka bootstrap server addresses (default is "localhost:19092"). |
'localhost:19092'
|
**kwargs
|
Additional keyword arguments to pass to the KafkaBroker if a new instance is created. |
{}
|
Attributes:
Name | Type | Description |
---|---|---|
bootstrap_servers |
Union[str, Iterable[str]]
|
A list or string of |
request_timeout_ms |
int
|
Client request timeout in milliseconds (default is 40,000 ms). |
retry_backoff_ms |
int
|
Milliseconds to back off when retrying on errors (default is 100 ms). |
metadata_max_age_ms |
int
|
Period after which to refresh metadata (default is 300,000 ms). |
connections_max_idle_ms |
int
|
Close idle connections after a specified time (default is 540,000 ms). |
sasl_kerberos_service_name |
str
|
Kerberos service name (default is |
sasl_kerberos_domain_name |
Optional[str]
|
Kerberos domain name. |
sasl_oauth_token_provider |
Optional[AbstractTokenProvider]
|
OAuthBearer token provider instance. |
loop |
Optional[AbstractEventLoop]
|
Optional event loop. |
client_id |
Optional[str]
|
A name for this client (default is |
acks |
Union[Literal[0, 1, -1, all], object]
|
Number of acknowledgments the producer requires before considering
a request complete (default is |
key_serializer |
Optional[Callable[[Any], bytes]]
|
Function to serialize keys (default is |
value_serializer |
Optional[Callable[[Any], bytes]]
|
Function to serialize values (default is |
compression_type |
Optional[Literal[gzip, snappy, lz4, zstd]]
|
Compression type (default is |
max_batch_size |
int
|
Maximum size of buffered data per partition (default is 16 KB). |
partitioner |
Callable[[bytes, List[Partition], List[Partition]], Partition]
|
Partitioner function for assigning messages to partitions. |
max_request_size |
int
|
Maximum size of a request (default is 1 MB). |
linger_ms |
int
|
Time to delay requests for batching (default is 0 ms). |
enable_idempotence |
bool
|
Whether to enable idempotence for the producer (default is |
transactional_id |
Optional[str]
|
Transactional ID for producing messages (default is |
transaction_timeout_ms |
int
|
Timeout for transactions (default is 60,000 ms). |
graceful_timeout |
Optional[float]
|
Graceful shutdown timeout (default is 15.0). |
decoder |
Optional[CustomCallable]
|
Custom decoder for messages (default is |
parser |
Optional[CustomCallable]
|
Custom parser for messages (default is |
dependencies |
Iterable[Depends]
|
Dependencies to apply to all broker subscribers (default is |
middlewares |
Sequence[Union[BrokerMiddleware[ConsumerRecord], BrokerMiddleware[Tuple[ConsumerRecord, ...]]]]
|
|
security |
Optional[BaseSecurity]
|
Security options for broker connection (default is |
asyncapi_url |
Union[str, Iterable[str], None]
|
AsyncAPI server URL (default is |
protocol |
Optional[str]
|
AsyncAPI server protocol (default is |
protocol_version |
Optional[str]
|
AsyncAPI server protocol version (default is |
description |
Optional[str]
|
AsyncAPI server description (default is |
tags |
Optional[Iterable[Union[Tag, TagDict]]]
|
AsyncAPI server tags (default is |
logger |
Optional[LoggerProto]
|
Custom logger to pass into context (default is |
log_level |
int
|
Log level for service messages (default is |
log_fmt |
Optional[str]
|
Log format (default is |
Source code in eggai/transport/kafka.py
connect()
async
Establishes a connection to the Kafka broker by starting the KafkaBroker instance.
This method is necessary before publishing or consuming messages. It asynchronously starts the broker to handle Kafka communication.
Source code in eggai/transport/kafka.py
disconnect()
async
Closes the connection to the Kafka broker by closing the KafkaBroker instance.
This method should be called when the transport is no longer needed to stop consuming messages and to release any resources held by the KafkaBroker.
Source code in eggai/transport/kafka.py
publish(channel, message)
async
Publishes a message to the specified Kafka topic (channel).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel
|
str
|
The name of the Kafka topic to which the message will be published. |
required |
message
|
Union[Dict[str, Any], BaseMessage]
|
The message to publish, which can either be a dictionary or a BaseMessage instance. The message will be serialized before being sent. |
required |
Source code in eggai/transport/kafka.py
subscribe(channel, handler, **kwargs)
async
Subscribes to a Kafka topic (channel) and sets up a handler to process incoming messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel
|
str
|
The Kafka topic to subscribe to. |
required |
handler
|
Callable
|
The function or coroutine that will handle messages received from the topic. |
required |
**kwargs
|
Additional keyword arguments that can be used to configure the subscription. |
{}
|
Other Parameters:
Name | Type | Description |
---|---|---|
filter_by_message |
Callable
|
A function to filter incoming messages based on their payload. If provided, this function will be applied to the message payload before passing it to the handler. |
batch |
bool
|
Whether to consume messages in batches or not (default is False). |
group_id |
Optional[str]
|
The consumer group name for dynamic partition assignment and offset management. |
key_deserializer |
Optional[Callable]
|
A function to deserialize the message key from raw bytes. |
value_deserializer |
Optional[Callable]
|
A function to deserialize the message value from raw bytes. |
fetch_max_bytes |
int
|
The maximum amount of data the server should return for a fetch request (default is 50 MB). |
fetch_min_bytes |
int
|
The minimum amount of data the server should return for a fetch request (default is 1 byte). |
fetch_max_wait_ms |
int
|
The maximum amount of time the server will block before responding to a fetch request (default is 500 ms). |
max_partition_fetch_bytes |
int
|
The maximum amount of data per-partition the server will return (default is 1 MB). |
auto_offset_reset |
str
|
A policy for resetting offsets on |
auto_commit |
bool
|
Whether to automatically commit offsets (default is True). |
auto_commit_interval_ms |
int
|
Interval in milliseconds between automatic offset commits (default is 5000 ms). |
check_crcs |
bool
|
Whether to check CRC32 of records to ensure message integrity (default is True). |
partition_assignment_strategy |
Sequence
|
List of strategies for partition assignment during group management (default is |
max_poll_interval_ms |
int
|
Maximum allowed time between calls to consume messages in batches (default is 300000 ms). |
rebalance_timeout_ms |
Optional[int]
|
Timeout for consumer rejoin during rebalance (default is None). |
session_timeout_ms |
int
|
Client group session timeout (default is 10000 ms). |
heartbeat_interval_ms |
int
|
The interval between heartbeats to the consumer coordinator (default is 3000 ms). |
consumer_timeout_ms |
int
|
Maximum wait timeout for background fetching routine (default is 200 ms). |
max_poll_records |
Optional[int]
|
The maximum number of records to fetch in one batch (default is None). |
exclude_internal_topics |
bool
|
Whether to exclude internal topics such as offsets from being exposed to the consumer (default is True). |
isolation_level |
str
|
Controls how to read messages written transactionally ('read_uncommitted' or 'read_committed', default is 'read_uncommitted'). |
batch_timeout_ms |
int
|
Milliseconds to wait for data in the buffer if no data is available (default is 200 ms). |
max_records |
Optional[int]
|
Number of messages to consume in one batch (default is None). |
listener |
Optional[ConsumerRebalanceListener]
|
Optionally provide a listener for consumer group rebalances (default is None). |
pattern |
Optional[str]
|
Pattern to match available topics (either this or |
partitions |
Collection[TopicPartition]
|
Explicit list of partitions to assign (can't use with |
Returns:
Name | Type | Description |
---|---|---|
Callable |
Callable
|
A callback function that represents the subscription. When invoked, it will call the handler with incoming messages. |