MQTT
This page describes MQTT Connectivity in more detail
MQTT (Message Queuing Telemetry Transport) is a lightweight and widely adopted messaging protocol well-suited for resource-constrained devices.
If a device communicates over MQTT with akenza, an MQTT data flow has to be set up.
There are two ways to authenticate: device credentials with JWTs use a per-device public/private key pair with JSON Web Tokens (JWTs, RFC 7519) and uplink secrets use a shared key per device connector that is passed as a password to the MQTT broker. The authentication type can be selected during creation of the device connector.
Using device credentials
Using the private key, a JWT has to be generated that is sent as the MQTT password when setting up connection to the akenza MQTT broker.
- MQTT username: arbitrary username (not used). Depending on framework this needs to be set in order for the password to be sent as well
- MQTT password: must be a valid JWT signed with the private key. Refer to Using JSON Web Tokens (JWTs)
- MQTT client id: the
clientId
must be the deviceId - MQTT topic: a topic with the following structure
- uplink topic:
/up/device/id/{deviceId}
- downlink topic:
/down/device/id/{deviceId}/#
Using uplink secret
After the creation of the MQTT device connector, a secret is generated which needs to be provided in the topic structure of the uplink request.
- MQTT username: the device connector id
- MQTT password: the device connector secret
- MQTT topic: a topic with the following structure
- uplink topic:
/up/<deviceConnectorSecret>/id/<deviceID>
- downlink topic:
/up/<deviceConnectorSecret>/id/<deviceID>/#
uplink.py
import paho.mqtt.publish as publish
topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
payload = '{"foo":"bar"}'
publish.single(topic, payload, hostname=host, port=1883, auth={'username':mqtt_username, 'password':mqtt_password})
pip3 install paho-mqtt
python3 uplink.py
It is possible to additionally specify a sub topic by appending it to the uplink topic, e.g.
/up/device/id/{deviceId}/alerts
will result in the akenza topic alerts
. The subtopic will be used as the topic under which data is stored in akenza and is available in the uplink decoder script of the device type. If multiple levels of subtopics are specified, the resulting topic will be separated by underscores, e.g. /up/device/id/{deviceId}/alerts/critical
will result in alerts_critical
.Subscribing to downlinks is possible by providing the following info:
- MQTT username: the device connector id
- MQTT password: the device connector secret
- MQTT topic: a topic with the following structure
/down/<deviceConnectorSecret>/id/<deviceID>
downlink.py
import paho.mqtt.client as mqtt
from paho.mqtt.client import error_string
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
device_id= "<copy from Akenza Device Api configuration>"
topic = "/down/{0}/id/{1}/#".format(mqtt_password, device_id)
def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print("Connected with result code {0}".format(error_string(rc)))
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
def on_disconnect(client, userdata, rc):
print("Unexpected disconnection. " + error_string(rc))
if rc != 0:
print("Unexpected disconnection. " + error_string(rc))
client = mqtt.Client("mqtt-test-client")
client.username_pw_set(mqtt_username, mqtt_password)
client.on_message = on_message
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(host, 1883)
client.subscribe(topic) # Subscribe to the topic, receive any messages published on it
client.loop_forever() # Start networking daemon
pip3 install paho-mqtt
python3 downlink.py
The downlink topic added in akenza will be appended to the topic subscribed to e.g. when providing "myTopic" the resulting downlink topic will look as follows: /down/{0}/id/{1}/myTopic.
akenza allows sending data using transport layer security (TLS). For this the port 8883 has to be used.
import paho.mqtt.publish as publish
import ssl
topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
payload = '{"foo":"bar"}'
tls_config = {
"cert_reqs": ssl.CERT_REQUIRED,
"tls_version": ssl.PROTOCOL_TLSv1_2
}
publish.single(topic, payload, hostname=host, port=8883, tls = tls_config, auth={'username':mqtt_username, 'password':mqtt_password})
Why am I getting a certificate expired error?
Akenza is using an ISRG Root X1 root certificate provided by Let's Encrypt to sign certificates. If your device is older, it might not contain an up to date root certificate. Aquire the root certificate from here and update your trust store accordingly.
import paho.mqtt.client as client
import ssl
topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
payload = '{"foo":"bar"}'
tls_config = {
"cert_reqs": ssl.CERT_REQUIRED,
"tls_version": ssl.PROTOCOL_TLSv1_2
}
publish.single(topic, payload, hostname=host, port=8883, tls = tls_config, auth={'username':mqtt_username, 'password':mqtt_password}, protocol=client.MQTTv5)
With MQTT v5 it is possible to send different content types by setting the ContentType property. The supported MIME types are:
application/json
text/plain
application/octet-stream
application/xml
text/csv
If this property is not set, a published message will be interpreted as a JSON message.
import paho.mqtt.client as client
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
topic ="<copy from Akenza Device Api configuration>"
host = "mqtt.akenza.io"
mqtt_username = "<copy from Akenza Device Api configuration>"
mqtt_password = "<copy from Akenza Device Api configuration>"
client_id = "<an arbitrary client id>"
payload = '<?xml version=\'1.0\' encoding=\'UTF-8\'?><data><foo>bar</foo></data>'
properties=Properties(PacketTypes.PUBLISH)
properties.ContentType="application/xml"
client = client.Client(client_id, protocol=mqtt_client.MQTTv5)
client.username_pw_set(username, password)
client.connect(host, port)
client.publish(topic, payload, properties=properties)
A set of examples for using MQTT with akenza can be found here:
Last modified 1mo ago