Siphon
Overview
Siphon is a lightweight, dynamically configurable data aggregation engine written in Go. Designed for edge computing and containerized deployments, Siphon acts as an intelligent middleware layer. It gathers raw telemetry data from diverse sources (such as MQTT brokers, local files, and shell command outputs), normalizes it, and dispatches it to various sinks (REST APIs, notification services, or local files) based on temporal or event-driven rules.
Architectural Overview
Siphon utilizes a decoupled, hub-and-spoke architecture centered around an in-memory Data Hub. The data flow is broken down into five distinct phases:
- Collectors (The Inputs): Actively subscribe to or poll configured sources. Current implementations include
mqtt(subscribes to topics),file(reads file contents), andshell(executes OS commands). - Parsers (The Normalizers): Convert raw, unstructured data from the Collectors into intermediate key-value pairs.
Supported engines include
jsonpathfor structured JSON andregexfor raw text extraction. - Data Hub: The central state manager holding the parsed data between ingestion and dispatch.
- Dispatchers (The Schedulers): Determine when data should be pushed to the destinations.
- Cron: Uses standard cron syntax (down to the second) to schedule batch dispatches.
- Event: Triggers immediately upon a value change, threshold breach, or a specified timeout (e.g., alerting when a sensor goes offline).
- Sinks (The Outputs): Format and transmit the scheduled data to its final destination. Sinks range from simple
stdoutand CSV files to external REST services like Gotify, Windy, and IOTPlotter.
Architecture diagram:
graph LR
subgraph Collectors
A1[MQTT Topic]
A2[Local File]
A3[Shell Command]
end
subgraph Core_Engine[Siphon Core]
B[Parsers: JSONPath/Regex]
C{Data Hub}
D[Dispatchers: Cron/Event]
end
subgraph Sinks
E1[REST / Gotify]
E2[MQTT / HA Discovery]
E3[CSV / File Sink]
end
A1 & A2 & A3 --> B
B --> C
C --> D
D --> E1 & E2 & E3
Key Features
- Expression Evaluation: Siphon integrates the powerful expr engine. This allows you to perform real-time math on incoming variables (e.g., converting Celsius to Fahrenheit inline) and dynamically generate complex JSON payloads for your sinks.
- Environment Variable Substitution: Securely inject secrets (like API keys) or environment-specific variables into
your YAML configuration using the
%%ENV_VARIABLE%%syntax. - Extensible Module System: Adding a new integration is entirely self-contained. Developers can easily write a new
Collector or Sink in Go and register it in
internal/modules/modules.gowithout altering the core engine. - Cloud-Native Builds: Siphon is built using
ko, providing minimal, secure, multi-architecture (x86/ARM) Docker containers out of the box.
Configuration Example
Siphon is entirely driven by a declarative YAML configuration. The following v1 schema demonstrates how to simultaneously pull weather data from MQTT, read local CPU temperatures, and monitor disk space via shell commands, dispatching the results via both Cron schedules and Event triggers.
1version: 1
2
3collectors:
4 mqtt:
5 type: mqtt
6 params:
7 url: ssl://mqtt:8883
8 user: user
9 pass: pass
10 mqtt-2:
11 type: mqtt
12 params:
13 url: ssl://mqtt2:8883
14 cpuTemp:
15 type: file
16 params:
17 interval: 5
18 cmd:
19 type: shell
20 params:
21 interval: 5
22
23sinks:
24 plotter:
25 type: iotplotter
26 params:
27 url: http://iotplotter.com
28 apikey: %%IOT_API_KEY%% # Environment variable expansion
29 feed: "123456789"
30 gotify:
31 type: gotify
32 params:
33 url: https://gotify
34 token: ABCDEFG
35 windy:
36 type: windy
37 params:
38 apikey: abcdefg
39 id: 1
40 stdout:
41 type: stdout
42
43data:
44 outside:
45 collector: mqtt
46 path: /topic1
47 parser: jsonpath
48 vars:
49 temp: "$.temp"
50 humi: "$.humi"
51 conv:
52 temp: "*10" # Expression evaluation to convert values
53 cnt:
54 collector: mqtt-2
55 path: /topic/2
56 parser: jsonpath
57 vars:
58 cnt: "$"
59 cpu:
60 collector: cpuTemp
61 path: /sys/class/thermal/thermal_zone0/temp
62 parser: regex
63 vars:
64 temp: "[0-9]*"
65 conv:
66 temp: float(temp) / 1000
67 free:
68 collector: cmd
69 path: "df -BM | grep -E '/$' | awk '{print $4}'"
70 parser: regex
71 vars:
72 space: "[0-9]*"
73 conv:
74 space: float(space) / 1024
75
76dispatchers:
77# 1. Cron Dispatcher: Fires every 30 seconds
78- type: cron
79 param: "*/30 * * * * *"
80 sinks:
81 - name: stdout
82 type: expr # Generates a JSON string using the state from the Data Hub
83 spec: |
84 {
85 "data": {
86 "temp": [ {"value": outside?.temp ?? 0} ],
87 "humi": [ {"value": outside?.humi ?? 0} ]
88 }
89 }
90 - name: stdout
91 type: template # Standard Go templating
92 spec: |
93 Cnt: {{ .cnt.cnt }}
94
95# 2. Event Dispatcher: Fires when 'outside' timestamp changes, or times out after 60s
96- type: event
97 param:
98 trigger: outside
99 var: timestamp
100 expr: new != old
101 timeout: 60
102 sinks:
103 - name: stdout
104 type: template
105 spec: |
106 EVENT: {{ if IsTimeout }}no update! {{ else }} new outside measurement! {{ end }}
107 - name: stdout
108 type: expr
109 spec: |
110 IsTimeout() ? "timeout" : "ok"
Deployment
Siphon is designed to be deployed as a containerized service. Mount your configuration file into the container to start processing data immediately.
1version: '2'
2
3volumes:
4 config:
5
6services:
7 siphon:
8 image: ghcr.io/mekops-labs/siphon:latest
9 command:
10 - /config/config.yaml
11 volumes:
12 - config:/config
13 restart: always
Roadmap
The project is still WIP. My current direction is to make it a viable Home Assistant extension with support for MQTT auto discovery, then I plan to polish the project.
Phase 1: Core Engine Refactor (V2 Architecture)
Focus: Implementing the Hybrid Event Bus and the new linear pipeline configuration.
- ☐ Config V2 Schema: Define config.go structs to parse the new pipelines array and generic parameters.
- ☐ Hybrid Bus Engine: Build HybridBus supporting both ModeVolatile (ring buffers) and ModeDurable (WAL).
- ☐ Modules Refactor: Update all existing Collectors to publish to the HybridBus and handle backend delivery failures.
- ☐ Sink Refactor: Update all existing Sinks to subscribe to the HybridBus and explicitly call event.Ack() upon success.
Phase 2: Home Assistant Native Ecosystem
Focus: Seamless integration, auto-discovery, and providing a premium Add-on experience.
- ☐ HA Config Structs: Define DiscoveryConfig and map YAML fields for HA metadata (Device Class, Node ID, etc.).
- ☐ HA MQTT Payloads: Implement JSON-tagged DiscoveryPayload and DevicePayload structs.
- ☐ MQTT Reliability: Implement Last Will and Testament (LWT) logic in the MQTT Sink.
- ☐ Auto-Discovery Hooks: Create Announce() method to publish retained discovery configs upon MQTT connection.
- ☐ Add-on Repository: Create the standalone HA Add-on structure.
- ☐ Ingress Web Server: Implement an embedded Go web server in pkg/editor.
- ☐ Embedded UI: Create an index.html using Ace.js for live config.yaml editing via HA Ingress.
Phase 3: API Gateway & Synchronous Processing
Focus: Transforming Siphon into a two-way sync engine for devices like wearables.
- ☐ Reply Channels: Add a ReplyTo channel to the Event struct to support synchronous routing.
- ☐ Webserver Collector: Implement a Collector that keeps HTTP requests open while waiting for the pipeline to finish.
- ☐ Responder Sink: Implement a Sink that formats pipeline output and writes it back to the ReplyTo channel.
Phase 4: Benchmarking & Performance
Focus: Proving reliability and measuring tail latency under extreme load.
- ☐ Harness Setup: Create an E2E testing harness (cmd/bench/main.go) utilizing a local MQTT broker.
- ☐ Precision Metrics: Integrate hdrhistogram-go for microsecond-precision latency and jitter tracking.
Phase 5: Advanced Extensibility & Polish
Focus: Documentation, tooling, and future-proofing the parser engine.
- ☐ JSON Schema: Generate a complete JSON Schema for config.yaml (v2).
- ☐ Wasm: WebAssembly processors support (plugin system), TBD.