Using Python and paho-mqtt

This example will use widely available and used python language and libraries to download some announcements, and then retrieve the corresponding data, using only the paho-mqtt client library, in addition to Python standard libraries.

[13]:
import json
import paho.mqtt.client as mqtt
import random
import urllib
import urllib.request


host='localhost'
user='wis2box'
password='wis2box'

r = random.Random()
clientId='MyQueueName'+ f"{r.randint(1,1000):04d}"
# number of messages to subscribe to.
messageCount = 0
messageCountMaximum = 5

# maximum size of data download to print.
sizeMaximumThreshold = 1023

The above imports the required modules. It is also assumed that localhost is set up and is publishing messages. Message queueing protocols provide real-time notification about availability of products.

The standard Python package used to subscribe to messages is paho-mqtt (paho.mqtt.client). The package uses callbacks.

Note that messageCount is used to limit the length of the demonstration (otherwise infinite, as it is a continuous flow).

Let’s investigate our callbacks.

[14]:
def sub_connect(client, userdata, flags, rc, properties=None):
    print("on connection to subscribe: ", mqtt.connack_string(rc))
    for s in ["xpublic/#"]:
        client.subscribe(s, qos=1)

The sub_connect callback needed is called when the connection is established, which required to subscribe to topics we are interested in (topics are: xpublic/#, where / is a topic separator and # is a wildcard for any tree of topics.

The qos=1 refers to Quality of Service, where 1 establishes reception of messages at least once. qos=1 is recommended.

The next callback is called every time a message is received, and decodes and prints the message.

To keep the output short for the demonstration, we limit the subscriber to a few messages.

[15]:
def sub_message(client, userdata, msg):
    """
    print messages received.  Exit on count received.
    """

    global messageCount,messageCountMaximum

    m = json.loads(msg.payload.decode('utf-8'))

    print(f"message {messageCount} topic: {msg.topic} received: {m}")
    print(f"message {messageCount} data: {getData(m)}")

    messageCount += 1

    if messageCount > messageCountMaximum:
        client.disconnect()
        client.loop_stop()

The message handler above calls the getData() (below). The messages themselves are usually announcements of data availability, but when data is small, they can include the data itself (inline) in the content field. Usually the message refers to the data using a link. Here is a routine to obtain the data given an announcement message:

[16]:
def getData(m, sizeMaximum=1000):
    """
    given a message, return the data it refers to
    """

    if 'size' in m and m['size'] > sizeMaximum:
        return f" data too large {m['size']} bytes"
    elif 'content' in m:
        if m['content']['encoding'] == 'base64':
            return b64decode(m['content']['value'])
        else:
            return m['content']['value'].encode('utf-8')
    else:
        url = m['baseUrl'] + '/' + m['relPath']
        with urllib.request.urlopen(url) as response:
            return response.read()

The calling code then registers the callbacks, connects to the broker, and starts the event loop:

[18]:
client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv5)
client.on_connect = sub_connect
client.on_message = sub_message
client.username_pw_set(user, password)
client.connect(host)

client.loop_forever()
on connection to subscribe: Connection Accepted.
message 0 topic: xpublic/v03/WIS/us/mobile_rgnl_al/surface/aviation/metar/us received: {'mode': '664', 'mtime': '20220224T052208.259097815', 'atime': '20220224T052208.259097815', 'pubTime': '20220224T052208.264983', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/us/mobile_rgnl_al/surface/aviation/metar/us/SAUS44_KMOB_240503_COR_8d674aab16213ac2b13fab2d79950456.txt', 'integrity': {'method': 'md5', 'value': 'jWdKqxYhOsKxP6steZUEVg=='}, 'size': 137}
message 0 data: b'SAUS44 KMOB 240503 COR\r\r\nMTRPRN\r\r\nMETAR KPRN 240458Z AUTO 20006G15KT 10SM OVC006 19/16 A3016 RMK AO2 \r\r\nSLP161 T01940161 402830183\r\r\n\r\r\n\x03'
message 1 topic: xpublic/v03/WIS/pr/tjgu/surface/miscellaneous/pr received: {'mode': '664', 'mtime': '20220224T052208.427098989', 'atime': '20220224T052208.427098989', 'pubTime': '20220224T052208.430775', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/pr/tjgu/surface/miscellaneous/pr/SXPU52_TJGU_240418_a8f650c50a0c0e38a41b0867a011574f.txt', 'integrity': {'method': 'md5', 'value': 'qPZQxQoMDjikGwhnoBFXTw=='}, 'size': 67}
message 1 data: b'SXPU52 TJGU 240418\r\r\nAAXX 24044\n78523 35/// /0503   30151 222//\r\r\n\x03'
message 2 topic: xpublic/v03/WIS/ca/canadian_met_centre/upperair/aircraft/airep/north-atlantic received: {'mode': '664', 'mtime': '20220224T052209.0511043072', 'atime': '20220224T052209.0511043072', 'pubTime': '20220224T052209.056451', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/ca/canadian_met_centre/upperair/aircraft/airep/north-atlantic/UANT01_CWAO_240503_2d512e655e32ce80001105dfa2fc19f0.txt', 'integrity': {'method': 'md5', 'value': 'LVEuZV4yzoAAEQXfovwZ8A=='}, 'size': 135}
message 2 data: b'UANT01 CWAO 240503\r\r\nARP BAW17V 5329N04306W 0503 F400 5400N04000W 0515 5400N03000W MS70\r\r\n 260/88 KT\r\r\nGZBKN DDL XXH 240503 L48A\r\r\n\r\r\n\x03'
message 3 topic: xpublic/v03/WIS/pr/tjgu/surface/miscellaneous/pr received: {'atime': '20220224T052208.435099125', 'mtime': '20220224T052208.435099125', 'mode': '664', 'pubTime': '20220224T052208.440895', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/pr/tjgu/surface/miscellaneous/pr/SXPU52_TJGU_240413_63e3ff1d1e3bc11b1f430024622ae5aa.txt', 'integrity': {'method': 'md5', 'value': 'Y+P/HR47wRsfQwAkYirlqg=='}, 'size': 67}
message 3 data: b'SXPU52 TJGU 240413\r\r\nAAXX 24044\n78523 35/// /0404   30151 222//\r\r\n\x03'
message 4 topic: xpublic/v03/WIS/us/wallops_i__wallops_station_va/surface/miscellaneous/nc received: {'mode': '664', 'atime': '20220224T052208.44309926', 'mtime': '20220224T052208.44309926', 'pubTime': '20220224T052208.445723', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/us/wallops_i__wallops_station_va/surface/miscellaneous/nc/SXNC50_KWAL_240503_99baec43c8b040b9e8496a762be9a891.txt', 'integrity': {'method': 'md5', 'value': 'mbrsQ8iwQLnoSWp2K+mokQ=='}, 'size': 132}
message 4 data: b'SXNC50 KWAL 240503\r\r\n\x1e326A9318 055050324 \r\n07.54 \r\n002 \r\n120 \r\n038 \r\n041 \r\n100 \r\n13.0 \r\n027.0 \r\n347 \r\n005 \r\n00000 \r\n 44+0NN  28W\r\r\n\x03'
message 5 topic: xpublic/v03/WIS/pr/tjgu/surface/miscellaneous/pr received: {'mode': '664', 'mtime': '20220224T052208.455099344', 'atime': '20220224T052208.455099344', 'pubTime': '20220224T052208.457988', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/pr/tjgu/surface/miscellaneous/pr/SXPU52_TJGU_240403_0034251607312a5feff05fd760128747.txt', 'integrity': {'method': 'md5', 'value': 'ADQlFgcxKl/v8F/XYBKHRw=='}, 'size': 67}
message 5 data: b'SXPU52 TJGU 240403\r\r\nAAXX 24044\n78523 35/// /0306   30151 222//\r\r\n\x03'
[18]:
7