Apache Kafka
Overview
The Apache Kafka integration consumes log events from Kafka topics and forwards them into Intelligent Data Pipeline flows.
Supported platforms
- Linux:
Logs - Windows:
Logs - macOS:
Logs
Authentication
Apache Kafka supports No Authentication (noauth) and Kafka SASL Credentials
(kafkasasl) with mechanisms PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512.
TLS options are also available to configure transport security.
Basic Configuration
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
brokers | array[string] | Yes | none | List of Kafka broker addresses in host:port format. |
topic | string | Yes | none | Kafka topic to consume logs from. |
encoding | string | No | text | Message decoding mode. Supported values include text, json, and otlp_proto_log from the node schema. |
group_id | string | No | otel-collector | Consumer group identifier. |
client_id | string | No | otel-collector | Kafka client identifier. |
initial_offset | string | No | latest | Offset start policy when no committed offset exists: latest or earliest. |
group_rebalance_strategy | string | No | cooperative-sticky | Partition assignment strategy: cooperative-sticky, range, roundrobin, sticky. |
protocol_version | string | No | auto-negotiated | Kafka protocol version override (for example 2.0.0). |
session_timeout | string | No | 10s | Consumer group session timeout. |
heartbeat_interval | string | No | 3s | Consumer group heartbeat interval. |
workers | integer | No | 1 | Number of worker goroutines processing partitions in parallel. |
tls.insecure | boolean | No | false | Disables TLS when set to true. |
tls.insecure_skip_verify | boolean | No | false | Skips server certificate verification when set to true. |
Example Configuration
{
"brokers": ["kafka-1:9092", "kafka-2:9092"], // required
"topic": "application-logs", // required
"encoding": "json", // default: "text"
"group_id": "otel-collector", // default: "otel-collector"
"client_id": "otel-collector", // default: "otel-collector"
"initial_offset": "latest", // default: "latest"
"group_rebalance_strategy": "cooperative-sticky", // default: "cooperative-sticky"
"protocol_version": "", // empty = auto-negotiation
"session_timeout": "10s", // default: "10s"
"heartbeat_interval": "3s", // default: "3s"
"workers": 1, // default: 1
"tls": {
"insecure": false, // default: false
"insecure_skip_verify": false // default: false
}
}
Metrics
Apache Kafka exposes source-level and Kafka-specific runtime metrics for monitoring ingestion health and consumer behavior.
| Metric Name | Description |
|---|---|
collector_source_records_dropped_total | Total records dropped by the source. |
collector_source_errors_total | Total source-level runtime errors. |
collector_source_parse_errors_total | Total parsing failures while decoding Kafka payloads. |
collector_source_connections_active | Current active client connections. |
collector_kafka_source_consumer_lag | Current consumer lag in records. |
collector_kafka_source_rebalance_total | Total consumer group rebalance events. |
collector_kafka_source_batch_process_duration_milliseconds_* | Batch processing latency distribution. |
collector_kafka_source_record_age_milliseconds_* | Record age distribution at processing time. |
collector_kafka_source_commit_duration_milliseconds_* | Offset commit latency distribution. |