Siphon

Overview

::project-logo
mekops-labs/siphon
::inline-badge
::inline-badge

Siphon is a lightweight, dynamically configurable data aggregation engine written in Go. Designed for edge computing and containerized deployments, Siphon acts as an intelligent middleware layer. It gathers raw telemetry data from diverse sources (such as MQTT brokers, local files, and shell command outputs), normalizes it, and dispatches it to various sinks (REST APIs, notification services, or local files) based on temporal or event-driven rules.

Architectural Overview

Siphon utilizes a decoupled, hub-and-spoke architecture centered around an in-memory Data Hub. The data flow is broken down into five distinct phases:

  1. Collectors (The Inputs): Actively subscribe to or poll configured sources. Current implementations include mqtt (subscribes to topics), file (reads file contents), and shell (executes OS commands).
  2. Parsers (The Normalizers): Convert raw, unstructured data from the Collectors into intermediate key-value pairs. Supported engines include jsonpath for structured JSON and regex for raw text extraction.
  3. Data Hub: The central state manager holding the parsed data between ingestion and dispatch.
  4. Dispatchers (The Schedulers): Determine when data should be pushed to the destinations.
    • Cron: Uses standard cron syntax (down to the second) to schedule batch dispatches.
    • Event: Triggers immediately upon a value change, threshold breach, or a specified timeout (e.g., alerting when a sensor goes offline).
  5. Sinks (The Outputs): Format and transmit the scheduled data to its final destination. Sinks range from simple stdout and CSV files to external REST services like Gotify, Windy, and IOTPlotter.

Architecture diagram:

  graph LR
    subgraph Collectors
        A1[MQTT Topic]
        A2[Local File]
        A3[Shell Command]
    end

    subgraph Core_Engine[Siphon Core]
        B[Parsers: JSONPath/Regex]
        C{Data Hub}
        D[Dispatchers: Cron/Event]
    end

    subgraph Sinks
        E1[REST / Gotify]
        E2[MQTT / HA Discovery]
        E3[CSV / File Sink]
    end

    A1 & A2 & A3 --> B
    B --> C
    C --> D
    D --> E1 & E2 & E3

Key Features

  • Expression Evaluation: Siphon integrates the powerful expr engine. This allows you to perform real-time math on incoming variables (e.g., converting Celsius to Fahrenheit inline) and dynamically generate complex JSON payloads for your sinks.
  • Environment Variable Substitution: Securely inject secrets (like API keys) or environment-specific variables into your YAML configuration using the %%ENV_VARIABLE%% syntax.
  • Extensible Module System: Adding a new integration is entirely self-contained. Developers can easily write a new Collector or Sink in Go and register it in internal/modules/modules.go without altering the core engine.
  • Cloud-Native Builds: Siphon is built using ko, providing minimal, secure, multi-architecture (x86/ARM) Docker containers out of the box.

Configuration Example

Siphon is entirely driven by a declarative YAML configuration. The following v1 schema demonstrates how to simultaneously pull weather data from MQTT, read local CPU temperatures, and monitor disk space via shell commands, dispatching the results via both Cron schedules and Event triggers.

  1version: 1
  2
  3collectors:
  4  mqtt:
  5    type: mqtt
  6    params:
  7      url: ssl://mqtt:8883
  8      user: user
  9      pass: pass
 10  mqtt-2:
 11    type: mqtt
 12    params:
 13      url: ssl://mqtt2:8883
 14  cpuTemp:
 15    type: file
 16    params:
 17      interval: 5
 18  cmd:
 19    type: shell
 20    params:
 21      interval: 5
 22
 23sinks:
 24  plotter:
 25    type: iotplotter
 26    params:
 27      url: http://iotplotter.com
 28      apikey: %%IOT_API_KEY%% # Environment variable expansion
 29      feed: "123456789"
 30  gotify:
 31    type: gotify
 32    params:
 33      url: https://gotify
 34      token: ABCDEFG
 35  windy:
 36    type: windy
 37    params:
 38      apikey: abcdefg
 39      id: 1
 40  stdout:
 41    type: stdout
 42
 43data:
 44  outside:
 45    collector: mqtt
 46    path: /topic1
 47    parser: jsonpath
 48    vars:
 49      temp: "$.temp"
 50      humi: "$.humi"
 51    conv:
 52      temp: "*10" # Expression evaluation to convert values
 53  cnt:
 54    collector: mqtt-2
 55    path: /topic/2
 56    parser: jsonpath
 57    vars:
 58      cnt: "$"
 59  cpu:
 60    collector: cpuTemp
 61    path: /sys/class/thermal/thermal_zone0/temp
 62    parser: regex
 63    vars:
 64      temp: "[0-9]*"
 65    conv:
 66      temp: float(temp) / 1000
 67  free:
 68    collector: cmd
 69    path: "df -BM | grep -E '/$' | awk '{print $4}'"
 70    parser: regex
 71    vars:
 72      space: "[0-9]*"
 73    conv:
 74      space: float(space) / 1024
 75
 76dispatchers:
 77# 1. Cron Dispatcher: Fires every 30 seconds
 78- type: cron
 79  param: "*/30 * * * * *"
 80  sinks:
 81  - name: stdout
 82    type: expr # Generates a JSON string using the state from the Data Hub
 83    spec: |
 84      {
 85        "data": {
 86          "temp": [ {"value": outside?.temp ?? 0} ],
 87          "humi": [ {"value": outside?.humi ?? 0} ]
 88        }
 89      }
 90  - name: stdout
 91    type: template # Standard Go templating
 92    spec: |
 93      Cnt: {{ .cnt.cnt }}
 94
 95# 2. Event Dispatcher: Fires when 'outside' timestamp changes, or times out after 60s
 96- type: event
 97  param:
 98    trigger: outside
 99    var: timestamp
100    expr: new != old
101    timeout: 60
102  sinks:
103  - name: stdout
104    type: template
105    spec: |
106      EVENT: {{ if IsTimeout }}no update! {{ else }} new outside measurement! {{ end }}
107  - name: stdout
108    type: expr
109    spec: |
110      IsTimeout() ? "timeout" : "ok"

Deployment

Siphon is designed to be deployed as a containerized service. Mount your configuration file into the container to start processing data immediately.

 1version: '2'
 2
 3volumes:
 4  config:
 5
 6services:
 7  siphon:
 8    image: ghcr.io/mekops-labs/siphon:latest
 9    command:
10    - /config/config.yaml
11    volumes:
12    - config:/config
13    restart: always

Roadmap

The project is still WIP. My current direction is to make it a viable Home Assistant extension with support for MQTT auto discovery, then I plan to polish the project.

Phase 1: Core Engine Refactor (V2 Architecture)

Focus: Implementing the Hybrid Event Bus and the new linear pipeline configuration.

  • ☐ Config V2 Schema: Define config.go structs to parse the new pipelines array and generic parameters.
  • ☐ Hybrid Bus Engine: Build HybridBus supporting both ModeVolatile (ring buffers) and ModeDurable (WAL).
  • ☐ Modules Refactor: Update all existing Collectors to publish to the HybridBus and handle backend delivery failures.
  • ☐ Sink Refactor: Update all existing Sinks to subscribe to the HybridBus and explicitly call event.Ack() upon success.

Phase 2: Home Assistant Native Ecosystem

Focus: Seamless integration, auto-discovery, and providing a premium Add-on experience.

  • ☐ HA Config Structs: Define DiscoveryConfig and map YAML fields for HA metadata (Device Class, Node ID, etc.).
  • ☐ HA MQTT Payloads: Implement JSON-tagged DiscoveryPayload and DevicePayload structs.
  • ☐ MQTT Reliability: Implement Last Will and Testament (LWT) logic in the MQTT Sink.
  • ☐ Auto-Discovery Hooks: Create Announce() method to publish retained discovery configs upon MQTT connection.
  • ☐ Add-on Repository: Create the standalone HA Add-on structure.
  • ☐ Ingress Web Server: Implement an embedded Go web server in pkg/editor.
  • ☐ Embedded UI: Create an index.html using Ace.js for live config.yaml editing via HA Ingress.

Phase 3: API Gateway & Synchronous Processing

Focus: Transforming Siphon into a two-way sync engine for devices like wearables.

  • ☐ Reply Channels: Add a ReplyTo channel to the Event struct to support synchronous routing.
  • ☐ Webserver Collector: Implement a Collector that keeps HTTP requests open while waiting for the pipeline to finish.
  • ☐ Responder Sink: Implement a Sink that formats pipeline output and writes it back to the ReplyTo channel.

Phase 4: Benchmarking & Performance

Focus: Proving reliability and measuring tail latency under extreme load.

  • ☐ Harness Setup: Create an E2E testing harness (cmd/bench/main.go) utilizing a local MQTT broker.
  • ☐ Precision Metrics: Integrate hdrhistogram-go for microsecond-precision latency and jitter tracking.

Phase 5: Advanced Extensibility & Polish

Focus: Documentation, tooling, and future-proofing the parser engine.

  • ☐ JSON Schema: Generate a complete JSON Schema for config.yaml (v2).
  • ☐ Wasm: WebAssembly processors support (plugin system), TBD.