Kafka Broker filter
The Apache Kafka broker filter decodes the client protocol for Apache Kafka, both the requests and responses in the payload. The message versions in Kafka 3.8.0 are supported.
By default the filter attempts not to influence the communication between client and brokers, so the messages that could not be decoded (due to Kafka client or broker running a newer version than supported by this filter) are forwarded as-is. However this requires the upstream Kafka cluster to be configured in proxy-aware fashion (see Configuration (no traffic mutation)).
If configured to mutate the received traffic, Envoy broker filter can be used to proxy a Kafka broker without any changes in the broker configuration. This requires the broker filter to be provided with rewrite rules so the addresses advertised by the Kafka brokers can be changed to the Envoy listener addresses (see Configuration (with traffic mutation)).
This filter should be configured with the type URL
type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
.
Attention
The Kafka broker filter is only included in contrib images
Attention
The kafka_broker filter is experimental and is currently under active development. Capabilities will be expanded over time and the configuration structures are likely to change.
Configuration (no traffic mutation)
The Kafka Broker filter can run without rewriting any requests / responses.
The filter should be chained with the TCP proxy filter as shown in the snippet below:
listeners:
- address:
socket_address:
address: 127.0.0.1 # Host that Kafka clients should connect to (i.e. bootstrap.servers).
port_value: 19092 # Port that Kafka clients should connect to (i.e. bootstrap.servers).
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
stat_prefix: exampleprefix
- name: envoy.filters.network.tcp_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp
cluster: localkafka
clusters:
- name: localkafka
connect_timeout: 0.25s
type: strict_dns
lb_policy: round_robin
load_assignment:
cluster_name: some_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1 # Kafka broker's host.
port_value: 9092 # Kafka broker's port.
The Kafka broker then needs to advertise the Envoy listener port instead of its own - this makes the downstream clients make any new connections to Envoy only.
# Listener value needs to be equal to cluster value in Envoy config
# (will receive payloads from Envoy).
listeners=PLAINTEXT://127.0.0.1:9092
# Advertised listener value needs to be equal to Envoy's listener
# (will make clients discovering this broker talk to it through Envoy).
advertised.listeners=PLAINTEXT://127.0.0.1:19092
Configuration (with traffic mutation)
The Kafka Broker filter can mutate the contents of received responses to enable easier proxying of Kafka clusters.
The below example shows a configuration for an Envoy instance that attempts to proxy brokers in 2-node cluster:
listeners:
- address: # This listener proxies broker 1.
socket_address:
address: envoy.example.org # Host that Kafka clients should connect to (i.e. bootstrap.servers).
port_value: 19092 # Port that Kafka clients should connect to (i.e. bootstrap.servers).
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
stat_prefix: exampleprefix1
id_based_broker_address_rewrite_spec: &kafka_rewrite_spec
rules:
- id: 1
host: envoy.example.org
port: 19092
- id: 2
host: envoy.example.org
port: 19093
- name: envoy.filters.network.tcp_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp
cluster: broker1cluster
- address: # This listener proxies broker 2.
socket_address:
address: envoy.example.org # Host that Kafka clients should connect to (i.e. bootstrap.servers).
port_value: 19093 # Port that Kafka clients should connect to (i.e. bootstrap.servers).
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
stat_prefix: exampleprefix2
id_based_broker_address_rewrite_spec: *kafka_rewrite_spec
- name: envoy.filters.network.tcp_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp
cluster: broker2cluster
clusters:
- name: broker1cluster
connect_timeout: 0.25s
type: strict_dns
lb_policy: round_robin
load_assignment:
cluster_name: some_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: broker1.example.org # Kafka broker's host for broker 1.
port_value: 9092 # Kafka broker's port for broker 1.
- name: broker2cluster
connect_timeout: 0.25s
type: strict_dns
lb_policy: round_robin
load_assignment:
cluster_name: some_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: broker2.example.org # Kafka broker's host for broker 2.
port_value: 9092 # Kafka broker's port for broker 2.
The address rewrite rules should cover all brokers present in the cluster - YAML blocks can be used to avoid repetition.
The responses that can be mutated are:
metadata (all partition discovery operations),
find coordinator (used by consumer groups and transactions),
describe cluster.
Filtering requests
Broker filter can be used to filter out unwanted types of requests, e.g. fetch ones or produce ones. Both allowlist and denylist are possible.
For example to allow only basic producer acces we can limit the access to the related requests:
- name: envoy.filters.network.kafka_broker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
stat_prefix: prefix
api_keys_allowed:
- 0 # Produce
- 3 # Metadata
- 18 # API versions
To disable consumers’ read capability, we can just disable Fetch requests:
- name: envoy.filters.network.kafka_broker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
stat_prefix: prefix
api_keys_denied:
- 1 # Fetch
Debugging
Java clients can see the hosts used if they set the log level of org.apache.kafka.clients.NetworkClient to debug - only Envoy’s listeners should be visible in the logs.
[DEBUG] [NetworkClient] Initiating connection to node localhost:19092 (id: -1 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Completed connection to node -1. Fetching API versions.
[DEBUG] [NetworkClient] Initiating connection to node localhost:19092 (id: 1 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Completed connection to node 1. Fetching API versions.
[DEBUG] [NetworkClient] Initiating connection to node localhost:19094 (id: 3 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Initiating connection to node localhost:19093 (id: 2 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Completed connection to node 2. Fetching API versions.
[DEBUG] [NetworkClient] Completed connection to node 3. Fetching API versions.
Statistics
Every configured Kafka Broker filter has statistics rooted at kafka.<stat_prefix>., with multiple statistics per message type.
Name |
Type |
Description |
---|---|---|
request.TYPE |
Counter |
Number of times a request of particular type was received from Kafka client |
request.unknown |
Counter |
Number of times a request with format not recognized by this filter was received |
request.failure |
Counter |
Number of times a request with invalid format was received or other processing exception occurred |
response.TYPE |
Counter |
Number of times a response of particular type was received from Kafka broker |
response.TYPE_duration |
Histogram |
Response generation time in milliseconds |
response.unknown |
Counter |
Number of times a response with format not recognized by this filter was received |
response.failure |
Counter |
Number of times a response with invalid format was received or other processing exception occurred |