Manufacturing plants produce data at machine speed. A single assembly line has 50+ sensors: temperature, vibration, pressure, humidity. Each sends a reading every 100ms. That’s 500 data points per second, per line. At Litmus, we built the backend to ingest this in real-time and surface anomalies before machines fail.
| See also: Cloud Spanner | Observability patterns | Database migrations | Voice AI | High-throughput systems |
Sensor Data Characteristics:
Manufacturing Plant
├─ Sensor 1 (MQTT)
├─ Sensor 2 (MQTT)
├─ PLC Controller (OPC-UA) ← Siemens, ABB, Schneider
└─ Local Buffer (queue anomalous readings)
↓
Edge Gateway (Kubernetes on-premise)
↓
Cloud (Python streaming pipelines)
↓
Analytics + Anomaly Detection
MQTT is lightweight (< 100 bytes overhead). Perfect for sensors.
import paho.mqtt.client as mqtt
import json
def on_message(client, userdata, msg):
"""Handle incoming sensor data."""
payload = json.loads(msg.payload)
# {'sensor_id': 'TEMP-01', 'temperature': 42.5, 'timestamp': 1686...}
# Write to stream (Kafka, Pub/Sub)
stream_producer.send({
'sensor_id': payload['sensor_id'],
'value': payload['temperature'],
'timestamp': payload['timestamp'],
'plant_id': userdata['plant_id'],
})
client = mqtt.Client(client_id="edge-gateway-01")
client.user_data_set({'plant_id': 'plant-xyz'})
client.on_message = on_message
client.connect("plant-mqtt-broker.local", 1883, keepalive=60)
client.subscribe("sensors/+/data") # Subscribe to all sensors
client.loop_forever()
OPC-UA is a protocol for talking to PLCs (Programmable Logic Controllers) in factories.
from opcua import Client
class OPCUAPoller:
def __init__(self, endpoint: str):
self.client = Client(endpoint)
self.client.connect()
def poll_registers(self, poll_interval_ms: int = 100):
"""Poll PLC registers every 100ms."""
while True:
# Read temperature from register
temperature_node = self.client.get_node("ns=2;i=1001")
temp_value = temperature_node.get_value()
# Read pressure from register
pressure_node = self.client.get_node("ns=2;i=1002")
pressure_value = pressure_node.get_value()
# Send to stream
stream.send({
'sensor_id': 'PLC-TEMP',
'value': temp_value,
'timestamp': datetime.now(),
})
time.sleep(poll_interval_ms / 1000)
def disconnect(self):
self.client.disconnect()
Raw sensor data is noise. Transform it into insights.
from apache_beam import Pipeline, Map, Filter
from apache_beam.options.pipeline_options import PipelineOptions
class SensorAnomalyDetector:
def __init__(self):
self.baseline_temp = 42.0 # Normal operating temperature
self.threshold = 5.0 # Alert if > 5°C deviation
def detect_anomaly(self, reading):
"""Flag readings outside normal range."""
if abs(reading['value'] - self.baseline_temp) > self.threshold:
return {
'sensor_id': reading['sensor_id'],
'value': reading['value'],
'deviation': reading['value'] - self.baseline_temp,
'status': 'ANOMALY',
'timestamp': reading['timestamp'],
}
return None
# Dataflow pipeline
pipeline = Pipeline(options=PipelineOptions())
anomalies = (
pipeline
| 'Read from Pub/Sub' >> beam.io.gcp.pubsub.ReadFromPubSub(topic='sensor-data')
| 'Parse JSON' >> Map(lambda x: json.loads(x))
| 'Detect Anomalies' >> Map(detector.detect_anomaly)
| 'Filter Nulls' >> Filter(lambda x: x is not None)
| 'Write Alerts' >> beam.io.gcp.pubsub.WriteToPubSub(topic='anomalies')
)
pipeline.run()
Plants have poor internet. Edge gateways buffer readings locally and sync when connection restored.
import sqlite3
from datetime import datetime, timedelta
class EdgeBuffer:
def __init__(self, db_path: str = "sensor_buffer.db"):
self.db = sqlite3.connect(db_path)
self._init_schema()
def buffer_reading(self, reading: dict):
"""Write to local DB if cloud unreachable."""
self.db.execute("""
INSERT INTO buffered_readings
(sensor_id, value, timestamp, synced)
VALUES (?, ?, ?, 0)
""", (reading['sensor_id'], reading['value'], reading['timestamp']))
self.db.commit()
def sync_to_cloud(self, cloud_endpoint: str):
"""Send buffered data once connection restored."""
unsynced = self.db.execute("""
SELECT id, sensor_id, value, timestamp FROM buffered_readings
WHERE synced = 0
ORDER BY timestamp ASC
""").fetchall()
for row_id, sensor_id, value, timestamp in unsynced:
try:
cloud_client.send({
'sensor_id': sensor_id,
'value': value,
'timestamp': timestamp,
})
self.db.execute("UPDATE buffered_readings SET synced = 1 WHERE id = ?", (row_id,))
self.db.commit()
except Exception as e:
# Connection failed, retry later
break
# Usage
buffer = EdgeBuffer()
try:
cloud_client.send(reading)
except ConnectionError:
buffer.buffer_reading(reading)
# Retry on timer
sync_thread = threading.Thread(target=buffer.sync_to_cloud)
sync_thread.start()
Tags: #IoT #MQTT #OPC-UA #EdgeComputing #Streaming #Manufacturing
Published: June 2026
Author: Pratik Dhanave
Related Projects: Litmus Industrial IoT