MQTT

This page describes MQTT Connectivity in more detail

MQTT (Message Queuing Telemetry Transport) is a lightweight and widely adopted messaging protocol well-suited for resource-constrained devices.

If a device communicates over MQTT with akenza, an MQTT data flow has to be set up.

Authentication

There are two ways to authenticate: device credentials with JWTs use a per-device public/private key pair with JSON Web Tokens (JWTs, RFC 7519) and uplink secrets use a shared key per device connector that is passed as a password to the MQTT broker. The authentication type can be selected during creation of the device connector.

Refer to Device Security for a more in-depth introduction into the authentication options.

Using device credentials

Using the private key, a JWT has to be generated that is sent as the MQTT password when setting up connection to the akenza MQTT broker.

  • MQTT username: arbitrary username (not used). Depending on framework this needs to be set in order for the password to be sent as well

  • MQTT password: must be a valid JWT signed with the private key. Refer to Using JSON Web Tokens (JWTs)

  • MQTT client id: the clientId must be the deviceId

  • MQTT topic: a topic with the following structure

    • uplink topic: /up/device/id/{deviceId}

    • downlink topic: /down/device/id/{deviceId}/#

Using uplink secret

After the creation of the MQTT device connector, a secret is generated which needs to be provided in the topic structure of the uplink request.

  • MQTT username: the device connector id

  • MQTT password: the device connector secret

  • MQTT topic: a topic with the following structure

    • uplink topic: /up/<deviceConnectorSecret>/id/<deviceID>

    • downlink topic: /up/<deviceConnectorSecret>/id/<deviceID>/#

Sending data with MQTT

uplink.py
import paho.mqtt.publish as publish

topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
payload = '{"foo":"bar"}'

publish.single(topic, payload, hostname=host, port=1883, auth={'username':mqtt_username, 'password':mqtt_password})
pip3 install paho-mqtt
python3 uplink.py

MQTT Sub Topics

It is possible to additionally specify a sub topic by appending it to the uplink topic, e.g. /up/device/id/{deviceId}/alerts will result in the akenza topic alerts. The subtopic will be used as the topic under which data is stored in akenza and is available in the uplink decoder script of the device type. If multiple levels of subtopics are specified, the resulting topic will be separated by underscores, e.g. /up/device/id/{deviceId}/alerts/critical will result in alerts_critical.

Subscribing to downlinks is possible by providing the following info:

  • MQTT username: the device connector id

  • MQTT password: the device connector secret

  • MQTT topic: a topic with the following structure /down/<deviceConnectorSecret>/id/<deviceID>

downlink.py
import paho.mqtt.client as mqtt
from paho.mqtt.client import error_string

host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
device_id= "<copy from Akenza Device Api configuration>"
topic = "/down/{0}/id/{1}/#".format(mqtt_password, device_id)

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print("Connected with result code {0}".format(error_string(rc)))

def on_message(client, userdata, message):
    print("Received message '" + str(message.payload) + "' on topic '"
        + message.topic + "' with QoS " + str(message.qos))

def on_disconnect(client, userdata, rc):
    print("Unexpected disconnection. " + error_string(rc))
    if rc != 0:
        print("Unexpected disconnection. " + error_string(rc))

client = mqtt.Client("mqtt-test-client")
client.username_pw_set(mqtt_username, mqtt_password)
client.on_message = on_message
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(host, 1883)
client.subscribe(topic)  # Subscribe to the topic, receive any messages published on it

client.loop_forever()  # Start networking daemon
pip3 install paho-mqtt
python3 downlink.py

The downlink topic added in akenza will be appended to the topic subscribed to e.g. when providing "myTopic" the resulting downlink topic will look as follows: /down/{0}/id/{1}/myTopic.

Using TLS

akenza allows sending data using transport layer security (TLS). For this the port 8883 has to be used.

import paho.mqtt.publish as publish
import ssl

topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
payload = '{"foo":"bar"}'

tls_config = {
    "cert_reqs": ssl.CERT_REQUIRED,
    "tls_version": ssl.PROTOCOL_TLSv1_2
}
publish.single(topic, payload, hostname=host, port=8883, tls = tls_config, auth={'username':mqtt_username, 'password':mqtt_password})

Refer to the paho-mqtt documentation for more information.

Why am I getting a certificate expired error?

Akenza is using an ISRG Root X1 root certificate provided by Let's Encrypt to sign certificates. If your device is older, it might not contain an up to date root certificate. Aquire the root certificate from here and update your trust store accordingly.

Sending data with MQTT v5

import paho.mqtt.client as client
import ssl

topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
payload = '{"foo":"bar"}'

tls_config = {
    "cert_reqs": ssl.CERT_REQUIRED,
    "tls_version": ssl.PROTOCOL_TLSv1_2
}

publish.single(topic, payload, hostname=host, port=8883, tls = tls_config, auth={'username':mqtt_username, 'password':mqtt_password}, protocol=client.MQTTv5)

MQTT v5 Content Types

With MQTT v5 it is possible to send different content types by setting the ContentType property. The supported MIME types are:

  • application/json

  • text/plain

  • application/octet-stream

  • application/xml

  • text/csv

If this property is not set, a published message will be interpreted as a JSON message.

import paho.mqtt.client as client
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes 

topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
client_id = "<an arbitrary client id>"
payload = '<?xml version=\'1.0\' encoding=\'UTF-8\'?><data><foo>bar</foo></data>'

properties=Properties(PacketTypes.PUBLISH)
properties.ContentType="application/xml"

client = client.Client(client_id, protocol=mqtt_client.MQTTv5)
client.username_pw_set(username, password)
client.connect(host, port)

client.publish(topic, payload, properties=properties)

Examples

A set of examples for using MQTT with akenza can be found here:

Last updated