Skip to main content

Pipeline Processing Architecture

Pipeline Syntax

pipeline JSON has 2 components

Nodes

{
"name": "node-01",
"type": "source",
"node_ref": "file_log",
"config": {
"include_path": ["/var/log/*.log"]
},
"outputs": ["out"],
"multiprocessor": "multiprocessor-01",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"],
"credential_ref": "cred_name"
}
NameAcceptable ValuesMandatoryDescription
namenon-empty stringYesuserdefined node name which is configured in the UI
typesource, processor, destination, pack, connectorYesnode category
node_refvalid node definition identifierYesnode reference present in the node definition to classify the node
configJSON object per node schemaYesoutput of JSONForms data for the node
inputsarray of input port namesDepends (mandatory for destinations)list of input ports supported by the node
outputsarray of output port namesDepends (mandatory for sources)list of output ports supported by the node
multiprocessorstring (processor-only)Noname of the multiprocessor if part of any (only applicable when type is processor and part of multiprocessor in UI)
supportsmetrics, logs, tracesYesthe set of signals the node can handle
activemetrics, logs, tracesYesthe type of signals the node is configured for
credential_refnon-empty stringNooptional field only set when the source requires credentials to be set

Connections

{
"to": "batch:in",
"from": "devnull:out"
}

connections define the relationship between 2 nodes, where to specifies the source node and from specifies the target node. The connection is always assumed to be directed to -> from

The format for the values in "to" and "from" is

node_name:port_name

Example

{
"nodes": [
{
"name": "user defined node name",
"type": "source|processor|destination|pack|connector",
"node_ref": "backend node_ref id for the node",
"config": {},
"outputs": ["out"],
"multiprocessor": "name of multiprocessor if part of any",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"]
}
],
"connections": [
{
"to": "batch-1756882307193-wactgn:in",
"from": "windows_events_system:out"
}
]
}

Packs Definition

packs are an logical construct which have a mini pipeline defined inside it.

A pack in a complete pipeline is represented in the following way

Note: Packs are always restricted to a single type of Signal either logs, metrics or traces

{
"name": "pack-1",
"type": "pack",
"pack_ref": "pack-1",
"pack_version": "2.0.0",
"inputs": ["in_1", "in_2"],
"outputs": ["out"],
"supports": ["metrics"],
"active": ["metrics"]
}

actual pack definition is as follows

{
"nodes": [
{
"name": "proc-1",
"type": "processor",
"node_ref": "batch",
"config": {},
"outputs": ["out"],
"multiprocessor": "name of multiprocessor if part of any",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"]
},
{
"name": "proc-2",
"type": "processor",
"node_ref": "batch",
"config": {},
"outputs": ["out"],
"multiprocessor": "name of multiprocessor if part of any",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"]
}
],
"connections": [
{
"to": "batch:in",
"from": "windows_events_system:out"
}
]
}

Predefined Source or Destination nodes

Substituting predefined source or destination node

{
"nodes": [
{
"name": "user defined node name",
"type": "source|destination",
"template_ref": "title_of_predefined_source/dest",
"inputs": ["in"],
"outputs": ["out"]
}
],
"connections": [
{
"to": "batch-1756882307193-wactgn:in",
"from": "windows_events_system:out"
}
]
}

The payload for predefined nodes is same as the normal ones but only difference being they will have a template_ref key with the source or dest name which they want to use and no config

Kubernetes-Aware Conversion

For pipelines that use K8s source nodes (k8s_logs, k8s_kubeletstats, k8s_hostmetrics, k8s_cluster, k8s_events, k8s_appmetrics, k8s_otlp), a separate conversion path produces per-deployment OpenTelemetryCollector CRs that the in-cluster Praxis Collector supervisor applies.

Pipeline

  1. Relationship.CovertToOTEL() — generic OTel config build, identical to non-K8s pipelines.
  2. Relationship.ConvertToK8sDeploymentInputs() (k8s_convert.go) — slices the typed OTel config into per-deployment groups via groupPipelinesByDeployment (keyed by the receiver's K8s kind, e.g. logs, kubeletstats, events).
  3. mergeNodeAgentGroups (k8s_convert_node_agent.go) — default on; opt out via ADL_K8S_MERGE_NODE_AGENT=false. When two or more of {logs, kubeletstats, hostmetrics} are present, collapses them into a single node-agent group so the fleet runs one DaemonSet pod per node instead of three. Halves per-node baseline overhead and OpAMP agent count; cost is coupled rollout cadence and a wider blast radius for OOM/crash.
  4. buildSignalConfig — per-group filter that pulls in only the receivers/processors/exporters/extensions referenced by that group's pipelines. Works identically for split and merged groups.
  5. createColDeploymentWithMeta (internal/control-plane/k8s/packaging.go) — wraps the filtered config in an OpenTelemetryCollector CR. Mode (DaemonSet/Deployment/StatefulSet) comes from K8sDeployMeta. Pod-level mounts and env vars are derived from the CR's actual receivers via addReceiverDrivenVolumes (internal/control-plane/k8s/node_agent.go) — filelog brings the four log-tailing host paths + COL_HOME, hostmetrics brings /hostfs. Same logic runs for split or merged CRs.

Migration safety

The file_storage hostPath is keyed on pipeline name (/var/lib/praxis-edge-collector/<pipeline>), not on the per-signal CR name. This is load-bearing: when a pipeline flips from split CRs (<pipeline>-logs) to the merged <pipeline>-node-agent, the new pod re-uses the same on-node bbolt offset DB instead of starting fresh and re-tailing every log file from byte 0. Pinned by TestAddReceiverDrivenVolumes_FileStoragePathStableAcrossSignalRename.

Rollout posture

  • ADL_K8S_MERGE_NODE_AGENT defaults to on. Supervisor-side reconcile now sequences Create new → wait Ready → Delete old, so the split→merged CR rename no longer drops signals. Set the env var to false to fall back to per-signal CRs (e.g. for tenants where per-signal blast-radius isolation matters more than per-node pod count).
  • The supervisor side does not special-case any signal name; it treats each k8s/<signal> config map entry as an opaque CR. Older supervisors handle merged CRs the same as split ones.

K8sDeployMeta precedence

Per-signal extractDeployMetaForSignal reads from the K8s receiver node's config (via the receiver's ExtractDeployMeta). For the merged node-agent group, mergeDeployMetaForNodeAgent takes the union: TolerateTaints is OR'd, PriorityClassName takes the first non-empty, LogPath/CRIType/RecombineLogs come from the logs-receiver's meta. A pipeline that opts into apply_tolerations: true on any member receiver gets a tolerant DaemonSet for the merged pod — important so log collection on tainted nodes doesn't silently drop after merge.