Using Python and paho-mqtt

This example will use widely available and used python language and libraries to download some announcements, and then retrieve the corresponding data, using only the paho-mqtt client library, in addition to Python standard libraries.

[1]:
import json
import paho.mqtt.client as mqtt
import random
import urllib
import urllib.request


host='localhost'
user='wis2box'
password='wis2box'

r = random.Random()
clientId='MyQueueName'+ f"{r.randint(1,1000):04d}"
# number of messages to subscribe to.
messageCount = 0
messageCountMaximum = 5

# maximum size of data download to print.
sizeMaximumThreshold = 1023

The above imports the required modules. It is also assumed that localhost is set up and is publishing messages. Message queueing protocols provide real-time notification about availability of products.

The standard Python package used to subscribe to messages is paho-mqtt (paho.mqtt.client). The package uses callbacks.

Note that messageCount is used to limit the length of the demonstration (otherwise infinite, as it is a continuous flow).

Let’s investigate our callbacks.

[2]:
def sub_connect(client, userdata, flags, rc, properties=None):
    print("on connection to subscribe: ", mqtt.connack_string(rc))
    for s in ["xpublic/#"]:
        client.subscribe(s, qos=1)

The sub_connect callback needed is called when the connection is established, which required to subscribe to topics we are interested in (topics are: xpublic/#, where / is a topic separator and # is a wildcard for any tree of topics.

The qos=1 refers to Quality of Service, where 1 establishes reception of messages at least once. qos=1 is recommended.

The next callback is called every time a message is received, and decodes and prints the message.

To keep the output short for the demonstration, we limit the subscriber to a few messages.

[3]:
def sub_message(client, userdata, msg):
    """
    print messages received.  Exit on count received.
    """

    global messageCount,messageCountMaximum

    m = json.loads(msg.payload.decode('utf-8'))

    print(f"message {messageCount} topic: {msg.topic} received: {m}")
    print(f"message {messageCount} data: {getData(m)}")

    messageCount += 1

    if messageCount > messageCountMaximum:
        client.disconnect()
        client.loop_stop()

The message handler above calls the getData() (below). The messages themselves are usually announcements of data availability, but when data is small, they can include the data itself (inline) in the content field. Usually the message refers to the data using a link. Here is a routine to obtain the data given an announcement message:

[4]:
def getData(m, sizeMaximum=1000):
    """
    given a message, return the data it refers to
    """

    if 'size' in m and m['size'] > sizeMaximum:
        return f" data too large {m['size']} bytes"
    elif 'content' in m:
        if m['content']['encoding'] == 'base64':
            return b64decode(m['content']['value'])
        else:
            return m['content']['value'].encode('utf-8')
    else:
        url = m['baseUrl'] + '/' + m['relPath']
        with urllib.request.urlopen(url) as response:
            return response.read()

The calling code then registers the callbacks, connects to the broker, and starts the event loop:

[ ]:
client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv5)
client.on_connect = sub_connect
client.on_message = sub_message
client.username_pw_set(user, password)
client.connect(host)

client.loop_forever()
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
on connection to subscribe:  Connection Accepted.
[ ]: