Skip to main content

Local installation

Requirements

  • Python: 3.12 or higher
  • MariaDB/MySQL: 5.7+ or MariaDB 10.3+
  • MQTT Broker: Mosquitto, EMQX, or any MQTT 3.1.1 compatible broker

Step-by-step setup

1

Install Python dependencies

Create a virtual environment to isolate dependencies:
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
Install required packages:
pip install -r requirements.txt
SQLAlchemy>=2.0.0
PyMySQL>=1.1.0
paho-mqtt>=2.1.0
requests>=2.32.0
python-dotenv>=1.0.1
2

Configure environment variables

Copy the example environment file:
cp .env.example .env
Edit .env to match your infrastructure:
.env
# Database connection
DB_HOST=192.168.0.137
DB_PORT=3306
DB_NAME=db
DB_USER=demo
DB_PASSWORD=demo

# Logging
LOG_DIR=./log

# HTTP settings for POST_ENDPOINT flows
HTTP_TIMEOUT_SECONDS=10

# MQTT client configuration
MQTT_CLIENT_ID=mqtt-gateway
MQTT_KEEPALIVE=60

# Flow reload interval (seconds)
FLOWS_RELOAD_INTERVAL_SECONDS=600

Environment variable reference

VariableRequiredDefaultDescription
DB_HOSTYes-MariaDB/MySQL host address
DB_PORTNo3306Database port
DB_NAMEYes-Database name
DB_USERYes-Database username
DB_PASSWORDYes-Database password
LOG_DIRNo./logDirectory for error logs (created automatically)
HTTP_TIMEOUT_SECONDSNo10Timeout for POST_ENDPOINT HTTP requests
MQTT_CLIENT_IDNomqtt-gatewayMQTT client identifier
MQTT_KEEPALIVENo60MQTT keepalive interval (seconds)
FLOWS_RELOAD_INTERVAL_SECONDSNo600How often to reload flows from database
3

Initialize the database

The gateway automatically creates tables on first run, but you can verify your database is accessible:
mysql -h 192.168.0.137 -P 3306 -u demo -p db
Tables created automatically:
  • mqtt_servers: MQTT broker configuration
  • flows: Flow definitions (topics, actions, schemas)
  • data: Stored message attributes (for STORE_DB flows)
The gateway uses SQLAlchemy’s create_all() which is idempotent. Running it multiple times is safe.
4

Start the service

Run the gateway:
python app.py
The entry point is simple:
app.py
from src.main import run

if __name__ == "__main__":
    run()
On startup, you’ll see:
  1. Database connection established (see src/config.py:22-26 for SQLAlchemy URL construction)
  2. Tables created if missing
  3. Default MQTT broker seeded if none exists (see src/db.py:20-35)
  4. Flows loaded and subscriptions established
  5. Message processing begins

Docker installation

Build the image

The included Dockerfile uses Python 3.12 slim as the base:
docker build -t mqtt-gateway:latest .
FROM python:3.12-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

RUN addgroup --system app && adduser --system --ingroup app app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "app.py"]

Run the container

Run the gateway with environment variables:
docker run --rm \
  -e DB_HOST=192.168.0.137 \
  -e DB_PORT=3306 \
  -e DB_NAME=db \
  -e DB_USER=demo \
  -e DB_PASSWORD=demo \
  -e LOG_DIR=/app/log \
  -v "$(pwd)/log:/app/log" \
  --name mqtt-gateway \
  mqtt-gateway:latest

Volume mounts

Host pathContainer pathPurpose
$(pwd)/log/app/logPersist error logs outside the container
Logs are written in daily rotating files with format YYYY-MM-DD.log (e.g., 2026-03-03.log).

Docker networking considerations

If your MariaDB or MQTT broker is running on the host machine, use host.docker.internal instead of localhost in environment variables (on Docker Desktop for Mac/Windows).On Linux, use --network host or configure a bridge network.
Example for host-based services:
docker run --rm \
  -e DB_HOST=host.docker.internal \
  -e DB_PORT=3306 \
  -e DB_NAME=db \
  -e DB_USER=demo \
  -e DB_PASSWORD=demo \
  -v "$(pwd)/log:/app/log" \
  --add-host=host.docker.internal:host-gateway \
  mqtt-gateway:latest

Database schema

The gateway uses three tables, all created automatically:

mqtt_servers table

Stores MQTT broker connection details.
src/models.py
class MqttServer(Base):
    __tablename__ = "mqtt_servers"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    host: Mapped[str] = mapped_column(String(255), nullable=False)
    port: Mapped[int] = mapped_column(Integer, nullable=False, default=1883)
    username: Mapped[str | None] = mapped_column(String(255), nullable=True)
    password: Mapped[str | None] = mapped_column(String(255), nullable=True)
    enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
    is_default: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
On first run, a default broker at 192.168.0.137:1883 is inserted if no enabled broker exists (see src/db.py:20-35).

flows table

Defines message processing flows.
src/models.py
class Flow(Base):
    __tablename__ = "flows"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    code: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    description: Mapped[str] = mapped_column(String(255), nullable=False)
    topic: Mapped[str] = mapped_column(String(255), nullable=False)
    action: Mapped[str] = mapped_column(String(30), nullable=False)
    payload_schema: Mapped[dict] = mapped_column(JSON, nullable=False)
    endpoint_url: Mapped[str | None] = mapped_column(String(500), nullable=True)
    last_msg_id: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
Actions:
  • STORE_DB: Store payload attributes in the data table
  • POST_ENDPOINT: Forward payload to endpoint_url via HTTP POST

data table

Stores message attributes for STORE_DB flows.
src/models.py
class DataRecord(Base):
    __tablename__ = "data"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    received_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, default=datetime.utcnow
    )
    flow_code: Mapped[str] = mapped_column(String(100), nullable=False)
    attribute_name: Mapped[str] = mapped_column(String(255), nullable=False)
    attribute_value: Mapped[str] = mapped_column(Text, nullable=False)
    last_msg_id: Mapped[int] = mapped_column(Integer, nullable=False)
Each message creates multiple rows—one per attribute. Group by last_msg_id to reconstruct the original message.

Logging

Error logs are written to the directory specified by LOG_DIR (default: ./log).
  • Format: YYYY-MM-DD.log (e.g., 2026-03-03.log)
  • Rotation: New file created daily at midnight UTC
  • Mode: Append mode during the day
Logs include:
  • MQTT connection failures (see src/mqtt_client.py:66-68)
  • Payload validation errors (see src/mqtt_client.py:80-82)
  • HTTP POST failures for POST_ENDPOINT flows (see src/processor.py:124-129)
  • Flow reload errors (see src/mqtt_client.py:122)

Health checks

The gateway does not expose an HTTP health endpoint by default. To monitor the service:
  1. Process monitoring: Check if python app.py is running
  2. Database queries: Verify last_msg_id is incrementing in the flows table
  3. Log monitoring: Watch for errors in LOG_DIR

Troubleshooting

Symptoms: Service exits with Fatal error during startup in logsSolutions:
  • Verify DB_HOST is accessible: ping 192.168.0.137
  • Test database connectivity: mysql -h DB_HOST -P DB_PORT -u DB_USER -p
  • Check firewall rules allow connections to port 3306
  • In Docker, ensure network mode allows access to host services
Symptoms: MQTT connection failed with reason code in logsSolutions:
  • Verify an enabled MQTT server exists: SELECT * FROM mqtt_servers WHERE enabled = 1
  • Test broker connectivity: telnet 192.168.0.137 1883
  • Check username and password in mqtt_servers if authentication is required
  • Verify the broker supports MQTT 3.1.1 (paho-mqtt compatibility)
Symptoms: Messages published but no data in data tableSolutions:
  • Verify flows are enabled: SELECT * FROM flows WHERE enabled = 1
  • Check topic patterns match: The gateway uses paho.mqtt.client.topic_matches_sub() for wildcard matching
  • Validate payload structure matches payload_schema
  • Check error logs for validation failures
Symptoms: Errors like Error posting flow X to endpoint Y in logsSolutions:
  • Verify endpoint_url is not NULL in the flow
  • Test the endpoint manually: curl -X POST http://endpoint -H "Content-Type: application/json" -d '{"test": "data"}'
  • Increase HTTP_TIMEOUT_SECONDS if the endpoint is slow
  • Check network connectivity from the gateway to the endpoint
Symptoms: New flows added to database but not subscribedSolutions:
  • Wait for FLOWS_RELOAD_INTERVAL_SECONDS (default: 600 seconds / 10 minutes)
  • Restart the service to force immediate reload
  • Check logs for Error reloading flows messages

Next steps

Configuration

Learn how to configure flows, schemas, and MQTT servers

API reference

Explore the database schema and available flow actions