Pipeline Processing Architecture
Pipeline Syntax
pipeline JSON has 2 components
Nodes
{
"name": "node-01",
"type": "source",
"node_ref": "file_log",
"config": {
"include_path": ["/var/log/*.log"]
},
"outputs": ["out"],
"multiprocessor": "multiprocessor-01",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"],
"credential_ref": "cred_name"
}
| Name | Acceptable Values | Mandatory | Description |
|---|---|---|---|
| name | non-empty string | Yes | userdefined node name which is configured in the UI |
| type | source, processor, destination, pack, connector | Yes | node category |
| node_ref | valid node definition identifier | Yes | node reference present in the node definition to classify the node |
| config | JSON object per node schema | Yes | output of JSONForms data for the node |
| inputs | array of input port names | Depends (mandatory for destinations) | list of input ports supported by the node |
| outputs | array of output port names | Depends (mandatory for sources) | list of output ports supported by the node |
| multiprocessor | string (processor-only) | No | name of the multiprocessor if part of any (only applicable when type is processor and part of multiprocessor in UI) |
| supports | metrics, logs, traces | Yes | the set of signals the node can handle |
| active | metrics, logs, traces | Yes | the type of signals the node is configured for |
| credential_ref | non-empty string | No | optional field only set when the source requires credentials to be set |
Connections
{
"to": "batch:in",
"from": "devnull:out"
}
connections define the relationship between 2 nodes, where to specifies the source node and from specifies the target node. The connection is always assumed to be directed to -> from
The format for the values in "to" and "from" is
node_name:port_name
Example
{
"nodes": [
{
"name": "user defined node name",
"type": "source|processor|destination|pack|connector",
"node_ref": "backend node_ref id for the node",
"config": {},
"outputs": ["out"],
"multiprocessor": "name of multiprocessor if part of any",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"]
}
],
"connections": [
{
"to": "batch-1756882307193-wactgn:in",
"from": "windows_events_system:out"
}
]
}
Packs Definition
packs are an logical construct which have a mini pipeline defined inside it.
A pack in a complete pipeline is represented in the following way
Note: Packs are always restricted to a single type of Signal either logs, metrics or traces
{
"name": "pack-1",
"type": "pack",
"pack_ref": "pack-1",
"pack_version": "2.0.0",
"inputs": ["in_1", "in_2"],
"outputs": ["out"],
"supports": ["metrics"],
"active": ["metrics"]
}
actual pack definition is as follows
{
"nodes": [
{
"name": "proc-1",
"type": "processor",
"node_ref": "batch",
"config": {},
"outputs": ["out"],
"multiprocessor": "name of multiprocessor if part of any",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"]
},
{
"name": "proc-2",
"type": "processor",
"node_ref": "batch",
"config": {},
"outputs": ["out"],
"multiprocessor": "name of multiprocessor if part of any",
"supports": ["metrics", "logs", "traces"],
"active": ["metrics", "logs"]
}
],
"connections": [
{
"to": "batch:in",
"from": "windows_events_system:out"
}
]
}
Predefined Source or Destination nodes
Substituting predefined source or destination node
{
"nodes": [
{
"name": "user defined node name",
"type": "source|destination",
"template_ref": "title_of_predefined_source/dest",
"inputs": ["in"],
"outputs": ["out"]
}
],
"connections": [
{
"to": "batch-1756882307193-wactgn:in",
"from": "windows_events_system:out"
}
]
}
The payload for predefined nodes is same as the normal ones but only difference being they will have a template_ref key with the source or dest name which they want to use and no config
Kubernetes-Aware Conversion
For pipelines that use K8s source nodes (k8s_logs, k8s_kubeletstats, k8s_hostmetrics, k8s_cluster, k8s_events, k8s_appmetrics, k8s_otlp), a separate conversion path produces per-deployment OpenTelemetryCollector CRs that the in-cluster Praxis Collector supervisor applies.
Pipeline
Relationship.CovertToOTEL()— generic OTel config build, identical to non-K8s pipelines.Relationship.ConvertToK8sDeploymentInputs()(k8s_convert.go) — slices the typed OTel config into per-deployment groups viagroupPipelinesByDeployment(keyed by the receiver's K8s kind, e.g.logs,kubeletstats,events).mergeNodeAgentGroups(k8s_convert_node_agent.go) — default on; opt out viaADL_K8S_MERGE_NODE_AGENT=false. When two or more of {logs, kubeletstats, hostmetrics} are present, collapses them into a singlenode-agentgroup so the fleet runs one DaemonSet pod per node instead of three. Halves per-node baseline overhead and OpAMP agent count; cost is coupled rollout cadence and a wider blast radius for OOM/crash.buildSignalConfig— per-group filter that pulls in only the receivers/processors/exporters/extensions referenced by that group's pipelines. Works identically for split and merged groups.createColDeploymentWithMeta(internal/control-plane/k8s/packaging.go) — wraps the filtered config in anOpenTelemetryCollectorCR. Mode (DaemonSet/Deployment/StatefulSet) comes fromK8sDeployMeta. Pod-level mounts and env vars are derived from the CR's actual receivers viaaddReceiverDrivenVolumes(internal/control-plane/k8s/node_agent.go) —filelogbrings the four log-tailing host paths +COL_HOME,hostmetricsbrings/hostfs. Same logic runs for split or merged CRs.
Migration safety
The file_storage hostPath is keyed on pipeline name (/var/lib/praxis-edge-collector/<pipeline>), not on the per-signal CR name. This is load-bearing: when a pipeline flips from split CRs (<pipeline>-logs) to the merged <pipeline>-node-agent, the new pod re-uses the same on-node bbolt offset DB instead of starting fresh and re-tailing every log file from byte 0. Pinned by TestAddReceiverDrivenVolumes_FileStoragePathStableAcrossSignalRename.
Rollout posture
ADL_K8S_MERGE_NODE_AGENTdefaults to on. Supervisor-side reconcile now sequencesCreate new → wait Ready → Delete old, so the split→merged CR rename no longer drops signals. Set the env var tofalseto fall back to per-signal CRs (e.g. for tenants where per-signal blast-radius isolation matters more than per-node pod count).- The supervisor side does not special-case any signal name; it treats each
k8s/<signal>config map entry as an opaque CR. Older supervisors handle merged CRs the same as split ones.
K8sDeployMeta precedence
Per-signal extractDeployMetaForSignal reads from the K8s receiver node's config (via the receiver's ExtractDeployMeta). For the merged node-agent group, mergeDeployMetaForNodeAgent takes the union: TolerateTaints is OR'd, PriorityClassName takes the first non-empty, LogPath/CRIType/RecombineLogs come from the logs-receiver's meta. A pipeline that opts into apply_tolerations: true on any member receiver gets a tolerant DaemonSet for the merged pod — important so log collection on tainted nodes doesn't silently drop after merge.