Kafka (Legacy)
Overview
Kafka (Legacy) consumes records from one or more Kafka topics using the legacy NetEnrich receiver. This is intentionally a separate component from the standard Kafka source — use this when you need:
- Static partition assignment (no consumer group, no rebalance) — pin a specific receiver to a known set of partitions for deterministic throughput.
- ZooKeeper-based broker / partition discovery — useful for older Kafka deployments where the bootstrap brokers list is not stable.
- Local file-backed offset storage — offsets are committed to a JSON file on the collector host, not back to Kafka.
If you don't have one of those requirements, prefer the standard Kafka source — it uses consumer groups and is the right answer for most workloads.
Supported types: Logs (the receiver decodes the record value into log records using the configured encoding)
Basic Configuration
| Parameter | Type | Default | Required | Description |
|---|---|---|---|---|
brokers | string[] | — | Yes (or enable ZK discovery) | Bootstrap broker addresses (host:port). Required unless zookeeper.discover_brokers is true. |
topics | string[] | — | Yes | Topics to consume from. At least one is required. |
partitions | string | — | Yes (or enable ZK discovery) | Comma-separated list of partition IDs to consume ("0,1,3"). Required unless zookeeper.discover_partitions is true. |
encoding | string | text | No | How the record value is decoded into a log record. One of: raw, text, json, json_object, json_array, json_lines, ndjson. |
initial_offset | string | latest | No | Where to start reading when no committed offset is found. One of: latest, earliest. |
client_id | string | otel-collector-legacy | No | Kafka client ID sent on every fetch. Useful for broker-side logging and quotas. |
Fetch Tuning
| Parameter | Type | Default | Description |
|---|---|---|---|
workers | int | 1 | Number of goroutines processing fetched record batches. Increase for high-throughput topics. |
fetch_min_bytes | int | 1 | Minimum bytes the broker waits to accumulate before responding to a fetch. Higher values trade latency for throughput. |
fetch_max_bytes | int | 1048576 (1 MiB) | Hard cap on bytes returned per fetch across all partitions. |
fetch_max_partition_bytes | int | 1048576 (1 MiB) | Hard cap on bytes returned per partition per fetch. |
fetch_max_wait | duration | 250ms | Maximum time the broker waits for fetch_min_bytes to accumulate. |
metadata_refresh_interval | duration | 10m | How often the receiver refreshes broker/topic metadata. |
broker_request_timeout | duration | 5s | Cap on how long franz-go waits for a broker TCP response before marking the connection dead. Each poll uses fetch_max_wait + this value as its deadline. Raise for high-latency Kafka 0.8.2 brokers; lower for faster failure detection. |
max_poll_records | int | broker default | Cap on records returned per fetch. Lower to bound per-batch processing time. |
partition_queue_size | int | franz-go default | Per-partition in-memory record queue depth. Raise for bursty partitions when downstream processing keeps up on average. |
record_age_sample_every | int | — | Sample every Nth record for the kafkalegacy_record_age_seconds metric (broker-vs-receiver lag). |
propagate_record_metadata | bool | true | When true, Kafka record metadata (topic, partition, offset, key, headers) is added as log record attributes. |
Header Extraction
| Parameter | Type | Default | Description |
|---|---|---|---|
extract_headers | bool | false | When true, Kafka record headers are extracted and added as log record attributes. |
header_keys | string[] | — | When extract_headers is true, only these header keys are extracted. Empty extracts all headers. |
Offset Storage
| Parameter | Type | Default | Description |
|---|---|---|---|
offset_store_path | string | ${COL_HOME}/metadata/kafkalegacy_offsets.json | File path on the collector host where consumed offsets are persisted. Must be on a directory the collector user can write. |
offset_commit_interval | duration | 5s | How often committed offsets are flushed to disk. Shorter intervals reduce duplicate-on-crash but increase disk I/O. |
ZooKeeper Discovery (optional)
When set, the receiver uses ZooKeeper instead of the static brokers/partitions lists. Required when discover_brokers or discover_partitions is true.
| Parameter | Type | Default | Description |
|---|---|---|---|
zookeeper.servers | string[] | — | ZooKeeper ensemble addresses (host:2181). Required when any other zookeeper field is set. |
zookeeper.chroot | string | — | ZooKeeper chroot path used by the Kafka cluster (e.g. /kafka). |
zookeeper.session_timeout | duration | ZK default | Session timeout for the ZK connection. |
zookeeper.discover_brokers | bool | false | When true, the receiver discovers bootstrap brokers from ZK and brokers becomes optional. |
zookeeper.discover_partitions | bool | false | When true, the receiver discovers partition assignments from ZK and partitions becomes optional. |
zookeeper.refresh_interval | duration | — | How often the receiver re-reads broker/partition info from ZK. |
Example Configuration
{
"brokers": ["broker1.kafka.svc:9092", "broker2.kafka.svc:9092"],
"topics": ["app.events", "app.audit"],
"partitions": "0,1,2,3",
"encoding": "json",
"initial_offset": "earliest",
"client_id": "praxis-collector-legacy",
"workers": 4,
"fetch_min_bytes": 1024,
"fetch_max_bytes": 4194304,
"fetch_max_wait": "500ms",
"metadata_refresh_interval": "5m",
"propagate_record_metadata": true,
"extract_headers": true,
"header_keys": ["trace-id", "tenant-id"],
"offset_store_path": "/var/lib/praxis-collector/kafka_offsets.json",
"offset_commit_interval": "10s",
}
With ZooKeeper discovery
{
"topics": ["app.events"],
"encoding": "json",
"zookeeper": {
"servers": ["zk-0.zk.svc:2181", "zk-1.zk.svc:2181", "zk-2.zk.svc:2181"],
"chroot": "/kafka",
"discover_brokers": true,
"discover_partitions": true,
"refresh_interval": "1m",
},
}
Limitations
- No consumer group / no rebalance. Each receiver instance must own a fixed partition set. For at-least-once delivery, do not run two receivers consuming the same topic+partition simultaneously — they will both commit independent offsets and produce duplicates.
- Local offset storage is per-host. If the collector pod moves to a different node and the offset file is not on a persistent volume, the receiver re-reads from
initial_offset. - Logs only. Metrics and traces from the record value are not supported by this receiver. Use the standard Kafka source for those signals.