Skip to main content

Kafka (Legacy)

Overview

Kafka (Legacy) consumes records from one or more Kafka topics using the legacy NetEnrich receiver. This is intentionally a separate component from the standard Kafka source — use this when you need:

  • Static partition assignment (no consumer group, no rebalance) — pin a specific receiver to a known set of partitions for deterministic throughput.
  • ZooKeeper-based broker / partition discovery — useful for older Kafka deployments where the bootstrap brokers list is not stable.
  • Local file-backed offset storage — offsets are committed to a JSON file on the collector host, not back to Kafka.

If you don't have one of those requirements, prefer the standard Kafka source — it uses consumer groups and is the right answer for most workloads.

Supported types: Logs (the receiver decodes the record value into log records using the configured encoding)

Basic Configuration

ParameterTypeDefaultRequiredDescription
brokersstring[]Yes (or enable ZK discovery)Bootstrap broker addresses (host:port). Required unless zookeeper.discover_brokers is true.
topicsstring[]YesTopics to consume from. At least one is required.
partitionsstringYes (or enable ZK discovery)Comma-separated list of partition IDs to consume ("0,1,3"). Required unless zookeeper.discover_partitions is true.
encodingstringtextNoHow the record value is decoded into a log record. One of: raw, text, json, json_object, json_array, json_lines, ndjson.
initial_offsetstringlatestNoWhere to start reading when no committed offset is found. One of: latest, earliest.
client_idstringotel-collector-legacyNoKafka client ID sent on every fetch. Useful for broker-side logging and quotas.

Fetch Tuning

ParameterTypeDefaultDescription
workersint1Number of goroutines processing fetched record batches. Increase for high-throughput topics.
fetch_min_bytesint1Minimum bytes the broker waits to accumulate before responding to a fetch. Higher values trade latency for throughput.
fetch_max_bytesint1048576 (1 MiB)Hard cap on bytes returned per fetch across all partitions.
fetch_max_partition_bytesint1048576 (1 MiB)Hard cap on bytes returned per partition per fetch.
fetch_max_waitduration250msMaximum time the broker waits for fetch_min_bytes to accumulate.
metadata_refresh_intervalduration10mHow often the receiver refreshes broker/topic metadata.
broker_request_timeoutduration5sCap on how long franz-go waits for a broker TCP response before marking the connection dead. Each poll uses fetch_max_wait + this value as its deadline. Raise for high-latency Kafka 0.8.2 brokers; lower for faster failure detection.
max_poll_recordsintbroker defaultCap on records returned per fetch. Lower to bound per-batch processing time.
partition_queue_sizeintfranz-go defaultPer-partition in-memory record queue depth. Raise for bursty partitions when downstream processing keeps up on average.
record_age_sample_everyintSample every Nth record for the kafkalegacy_record_age_seconds metric (broker-vs-receiver lag).
propagate_record_metadatabooltrueWhen true, Kafka record metadata (topic, partition, offset, key, headers) is added as log record attributes.

Header Extraction

ParameterTypeDefaultDescription
extract_headersboolfalseWhen true, Kafka record headers are extracted and added as log record attributes.
header_keysstring[]When extract_headers is true, only these header keys are extracted. Empty extracts all headers.

Offset Storage

ParameterTypeDefaultDescription
offset_store_pathstring${COL_HOME}/metadata/kafkalegacy_offsets.jsonFile path on the collector host where consumed offsets are persisted. Must be on a directory the collector user can write.
offset_commit_intervalduration5sHow often committed offsets are flushed to disk. Shorter intervals reduce duplicate-on-crash but increase disk I/O.

ZooKeeper Discovery (optional)

When set, the receiver uses ZooKeeper instead of the static brokers/partitions lists. Required when discover_brokers or discover_partitions is true.

ParameterTypeDefaultDescription
zookeeper.serversstring[]ZooKeeper ensemble addresses (host:2181). Required when any other zookeeper field is set.
zookeeper.chrootstringZooKeeper chroot path used by the Kafka cluster (e.g. /kafka).
zookeeper.session_timeoutdurationZK defaultSession timeout for the ZK connection.
zookeeper.discover_brokersboolfalseWhen true, the receiver discovers bootstrap brokers from ZK and brokers becomes optional.
zookeeper.discover_partitionsboolfalseWhen true, the receiver discovers partition assignments from ZK and partitions becomes optional.
zookeeper.refresh_intervaldurationHow often the receiver re-reads broker/partition info from ZK.

Example Configuration

{
"brokers": ["broker1.kafka.svc:9092", "broker2.kafka.svc:9092"],
"topics": ["app.events", "app.audit"],
"partitions": "0,1,2,3",
"encoding": "json",
"initial_offset": "earliest",
"client_id": "praxis-collector-legacy",

"workers": 4,
"fetch_min_bytes": 1024,
"fetch_max_bytes": 4194304,
"fetch_max_wait": "500ms",
"metadata_refresh_interval": "5m",
"propagate_record_metadata": true,

"extract_headers": true,
"header_keys": ["trace-id", "tenant-id"],

"offset_store_path": "/var/lib/praxis-collector/kafka_offsets.json",
"offset_commit_interval": "10s",
}

With ZooKeeper discovery

{
"topics": ["app.events"],
"encoding": "json",
"zookeeper": {
"servers": ["zk-0.zk.svc:2181", "zk-1.zk.svc:2181", "zk-2.zk.svc:2181"],
"chroot": "/kafka",
"discover_brokers": true,
"discover_partitions": true,
"refresh_interval": "1m",
},
}

Limitations

  • No consumer group / no rebalance. Each receiver instance must own a fixed partition set. For at-least-once delivery, do not run two receivers consuming the same topic+partition simultaneously — they will both commit independent offsets and produce duplicates.
  • Local offset storage is per-host. If the collector pod moves to a different node and the offset file is not on a persistent volume, the receiver re-reads from initial_offset.
  • Logs only. Metrics and traces from the record value are not supported by this receiver. Use the standard Kafka source for those signals.