Skip to main content

Apache Kafka

Overview

The Apache Kafka integration consumes log events from Kafka topics and forwards them into Intelligent Data Pipeline flows.

Supported platforms

  • Linux: Logs
  • Windows: Logs
  • macOS: Logs

Authentication

Apache Kafka supports No Authentication (noauth) and Kafka SASL Credentials (kafkasasl) with mechanisms PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512.

TLS options are also available to configure transport security.

Basic Configuration

ParameterTypeRequiredDefaultDescription
brokersarray[string]YesnoneList of Kafka broker addresses in host:port format.
topicstringYesnoneKafka topic to consume logs from.
encodingstringNotextMessage decoding mode. Supported values include text, json, and otlp_proto_log from the node schema.
group_idstringNootel-collectorConsumer group identifier.
client_idstringNootel-collectorKafka client identifier.
initial_offsetstringNolatestOffset start policy when no committed offset exists: latest or earliest.
group_rebalance_strategystringNocooperative-stickyPartition assignment strategy: cooperative-sticky, range, roundrobin, sticky.
protocol_versionstringNoauto-negotiatedKafka protocol version override (for example 2.0.0).
session_timeoutstringNo10sConsumer group session timeout.
heartbeat_intervalstringNo3sConsumer group heartbeat interval.
workersintegerNo1Number of worker goroutines processing partitions in parallel.
tls.insecurebooleanNofalseDisables TLS when set to true.
tls.insecure_skip_verifybooleanNofalseSkips server certificate verification when set to true.

Example Configuration

{
"brokers": ["kafka-1:9092", "kafka-2:9092"], // required
"topic": "application-logs", // required

"encoding": "json", // default: "text"
"group_id": "otel-collector", // default: "otel-collector"
"client_id": "otel-collector", // default: "otel-collector"
"initial_offset": "latest", // default: "latest"
"group_rebalance_strategy": "cooperative-sticky", // default: "cooperative-sticky"
"protocol_version": "", // empty = auto-negotiation
"session_timeout": "10s", // default: "10s"
"heartbeat_interval": "3s", // default: "3s"
"workers": 1, // default: 1

"tls": {
"insecure": false, // default: false
"insecure_skip_verify": false // default: false
}
}

Metrics

Apache Kafka exposes source-level and Kafka-specific runtime metrics for monitoring ingestion health and consumer behavior.

Metric NameDescription
collector_source_records_dropped_totalTotal records dropped by the source.
collector_source_errors_totalTotal source-level runtime errors.
collector_source_parse_errors_totalTotal parsing failures while decoding Kafka payloads.
collector_source_connections_activeCurrent active client connections.
collector_kafka_source_consumer_lagCurrent consumer lag in records.
collector_kafka_source_rebalance_totalTotal consumer group rebalance events.
collector_kafka_source_batch_process_duration_milliseconds_*Batch processing latency distribution.
collector_kafka_source_record_age_milliseconds_*Record age distribution at processing time.
collector_kafka_source_commit_duration_milliseconds_*Offset commit latency distribution.