NATS.io Queue Group REST API w/ Sanic and Python3+

Pub/Sub, Request-Reply, Load Balancing … Oh My…

Estimated Reading Time: 20 minutes
Difficulty: Intermediate

This post is purely illustrative and educational. It showcases a working version of a Queue Group + Request-Reply pattern implementation in Python3+ with Nats.io and Sanic. This code is not to be used in production as is, and should be modified and or rewritten to meet the requirements and standards of a production-level implementation.

NATS is a connective technology much like Kafka or Kinesis though far much simpler to use and adopt. The Publish-Subscribe pattern is easy to understand, if one has worked with any message queue, authored/used a file watcher, and has not been exposed to “streaming” technologies; NATS is Open Source, easy to adopt, especially with its built-in asynchronous patterns. However, the goal is not to explain NATS ad infinitum, and add to the pool of NATS explanations across the web or to be preferential.

The goal is to showcase one of the many built-in patterns in NATS and accrue hands-on time with the technology to better understand it.

What is a Queue Group in 3 sentences or less?

A Queue Group is essentially a pool of subscribers load-balanced on a NATS subject. Adding two of the same subscriptions to a subject will result in both executing. If these same subscriptions are part of a Queue Group, each message is received by one and only one random subscriber in the subscriber pool.

What is a valid use case for Queue Groups?

One of the designs I have favored is having message validation IO completely decoupled from computationally intensive APIs. This is to prevent invalid messages from entering and/or leaving a system, and to offload validation (both light and heavy) to a separate tier that responds immediately vs bundling validation with valuable compute.

This design can be seen in API Gateway, where a JSON Schema can be defined and executed prior to invoking a Lambda or transmitting to another AWS Service.

For this implementation, the goal is much simpler, to gauge the degree of difficulty to implement Queue Groups, and observe them in action.

Let’s get started!

There are three parts needed to prove this pattern and its usefulness :

  1. A running NATS Server. This is extremely easy to install and configure NATS with docker.
  2. An API Framework. Python3+ is used for this implementation along with the Sanic Framework for the API as it is quick to get up and running.
  3. A Subscriber that will listen to a NATS subject and respond using NATS Request-Reply

With these 3 components, implementing an API with Request-Reply to a NATS Subject with Subscribers is extremely straightforward.

Create the Subscriber

Subscriber implementation can be seen HERE if TLDR

The subscriber should be able to identify itself to the requestor, distinguishing itself from other subscribers in the Queue Group on reply. If created correctly, multiple requests to a single NATS subject with multiple queued subscribers should result in a randomly selected subscriber reply on each request.

Step #1 Identify the Subscriber

A UUID could be used, but what would be the fun in that? Boring! Random human-readable “slugs” or names seem much easier to align which subscriber responded without having to manually scan a few letters of a UUID. For this, a Python library called ‘coolname‘ was used.

from coolname import generate_slug
# generate a coolname for this subscriber
subscriptionName : str = generate_slug(2)

Using 2 as a parameter to generate_slug(..), generates names like ‘macho-baboon’, or ‘coy-turtle’

Step #2 Create a callback w/ an adjustable delay

While there is no shortage of algorithms that would slow responses through computationally intensive algorithms, the goal is to prove the load balancing aspect vs cpu performance as this varies from one environment to another.

To keep it simple, all that is needed is a delay, an async delay to be exact, and use the message sent to the callback to issue a reply back to the NATS reply-to token. An async delay is used as this type of event-based programming lends itself to an async environment like Python’s asyncio.

Asynchronous programming was favored as I have become used to it over the years of writing NodeJS APIs. When applicable use the best tool for the job..

# this method executes an async delay for the duration defined in the environment variable : DURATION
async def delayedResponse(msg : NATS.msg_class) -> None :
    global subscriptionName

    # important to use await here, this is a non blocking sleep 
    await asyncio.sleep(settings.DURATION)

    messageCount : str = msg.data.decode()
    replyMessage : str = "Message %1s recieved by '%2s.'" % (messageCount,subscriptionName)
    print(replyMessage)

    # using the Request-Reply pattern msg.respond publishes a message to the reply-to (message must be in bytes)
    await msg.respond(replyMessage.encode())

Note : ‘messageCount’ can be thought of as ‘message-id’, this may be changed after this article is published.

Step #3 Create the subscriber

This was the most interesting implementation hurdle. There are plenty of extremely lengthy Python examples for creating a NATS long-running subscriber. They were ignored in lieu of a simple while loop with an async.sleep (exactly how I would have done it in NodeJS šŸ˜ )

With any long-running process like this, it is important to unsubscribe and close the NATS connection to prevent orphaned processes. It is common to close all processes without doing the aforementioned, only to find that messages sent to a subject continue returning responses.

IMPORTANT – the most essential part of this function is setting ‘queue‘. This is how the load balancing is achieved within the Queue Group.

# this is the main subscriber, note that it is asynchronous as there are multiple awaits within needed by NATS
async def main () -> None : 
    # identify subscriber
    global subscriptionName
    print("Starting subscriber '%1s'." % (subscriptionName))

    # provide access to the close connectionGlobal var for use in the keep alive loop
    global closeConnection

    # NOTE - this is anycio compatible NATS, not the synchronous NATS functionality
    nc = NATS()

    # connect to the server defined in the environment variable : SERVER
    await nc.connect(servers=settings.SERVER)
    print("Connection to NATS Server '%1s' established by '%2s'." % (settings.SERVER,subscriptionName))
    
    # create the subscription to the subject defined in the environment variable : NATS_SUBJECT
    subscription = await nc.subscribe(
        settings.NATS_SUBJECT,

        # callback to the delayed response method defined above
        cb=delayedResponse,

        # IMPORTANT -> specifying queue allows multiple instances of this process to load balance
        # the queue specified in the environment variable : NATS_QUEUE_NAME.
        queue=settings.NATS_QUEUE_NAME
    )
   
    # while the global variable closeConnection is False, loop every one second to keep alive
    # this can be a larger amount if needed, it is set to 1 for quick exiting only
    while not closeConnection :
        await asyncio.sleep(1)
    
    
    ###################### CLEANUP and EXIT ######################
    
    # unsubscribe from the above NATS subscription
    await subscription.unsubscribe()
    print("\nUnsubscribed '%2s' from '%1s.'" % (settings.NATS_SUBJECT,subscriptionName))

    # close the NATS connection, there have been instances where a process was not properly closed w/ orphan connections
    await nc.close()
    print("Closed connection to Nats Server '%1s'." % (settings.SERVER))

    # exit this process
    sys.exit()

Create the API

Sanic API implementation can be seen HERE if TLDR

The API is the most simplistic implementation as it simply is a wrapper around NATS Request-Reply mechanics. It is essentially a NATS HTTP proxy. There are before_server_start and before_server_stop for initialization and cleanup respectively, the code can be seen in the link above, as they feel immaterial for the purposes of this article.

That is all there is to it! Line 10, NATS Request-Reply in action! This makes me šŸ¤¦šŸ½ā€ā™‚ļø (facepalm) at how simple this is. It is almost too simple.

Keep it Short and Simple (Kiss Principle) – I hope that developers take note, implementation should be this simple in 2023+ (should have been this simple in most libraries a decade or more ago).

# basic root endpoint http://<SERVER>:<PORT> (defaults to http://localhost:8000)
@app.get("/")
async def root_request(request : Request):
    try:
       
        # increment the message count (to identify messages when multiple subscribers are attached)
        app.ctx.messageCount += 1

        # use NATS Request/Reply pattern to publish to NATS_SUBJECT (it should be load balanced and sticky to the receiving subscriber)
        msg : NATS.msg_class = await app.ctx.nc.request(settings.NATS_SUBJECT, str(app.ctx.messageCount).encode(),timeout=60)

        # return the subscriber response to the requestor (message returned is in bytes must decode to utf-8)
        return text(msg.data.decode())

    except asyncio.TimeoutError:

        # there should not be timeouts unless a large duration is set
        print("Timed out waiting for response")
        return text("Request timed out sending message %1s." % (app.ctx.messageCount))

Lets run this thing!

Running this application is straightforward. Follow the steps in the README.md of the repository for prerequisites. Once the environment is ready for a run…

Step #1 Launch a Subscriber

Ensure a new terminal/console is used for each creation when creating a Subscriber. In each terminal run python3 subscriber.py Creating multiple separate Subscriber processes allows the Queue Group to be formed providing the load-balancing capabilities we set out to prove.

my-username@a-computer-name nats-queue-api-python % python3 subscriber.py

python3 subscriber.py

Starting subscriber 'analytic-honeybee'.
Connection to NATS Server 'localhost' established by 'analytic-honeybee'.

Note that the make command is merely a python3 command underneath and that the subscriber name was randomly generated as described in the above Subscriber creation section.

NOTE – open multiple terminals/consoles to the root of this project and issue the ‘make subscriber’ command to scale up

Step #2 Launch the Sanic API

This is a standard Sanic API launch. Ensure that a terminal/console is open to the root of this project and run the command python3 api.py

my-username@a-computer-name nats-queue-api-python % python3 api.py

sanic api:app
[2023-05-21 21:07:06 -0400] [52986] [INFO] 
  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
  ā”‚                          Sanic v23.3.0                           ā”‚
  ā”‚                Goin' Fast @ http://127.0.0.1:8000                ā”‚
  ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
  ā”‚                       ā”‚     mode: production, single worker      ā”‚
  ā”‚     ā–„ā–ˆā–ˆā–ˆ ā–ˆā–ˆā–ˆā–ˆā–ˆ ā–ˆā–ˆ     ā”‚   server: sanic, HTTP/1.1                ā”‚
  ā”‚    ā–ˆā–ˆ                 ā”‚   python: 3.11.3                         ā”‚
  ā”‚     ā–€ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ ā–ˆā–ˆā–ˆā–„     ā”‚ platform: macOS-13.3.1-x86_64-i386-64bit ā”‚
  ā”‚                 ā–ˆā–ˆ    ā”‚ packages: sanic-routing==22.8.0          ā”‚
  ā”‚    ā–ˆā–ˆā–ˆā–ˆ ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–€     ā”‚                                          ā”‚
  ā”‚                       ā”‚                                          ā”‚
  ā”‚ Build Fast. Run Fast. ā”‚                                          ā”‚
  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

[2023-05-21 21:07:06 -0400] [52986] [WARNING] Sanic is running in PRODUCTION mode. Consider using '--debug' or '--dev' while actively developing your application.
Connection to NATS Server 'localhost' established.
[2023-05-21 21:07:06 -0400] [52991] [INFO] Starting worker [52991]

It can be seen that ‘make api’ is running the standard Sanic app start command underneath.

Step #3 Load Test!

Using Artillery it is extremely easy to run load tests and get a report showing what effect general load or sustained load over time has on the response time.

Open a new terminal/console to the root of this project and run artillery quick –count 20 -n 20 http://<HOST&gt;:<HOST_PORT>/ –output report.json && artillery report report.json

my-username@a-computer-name nats-queue-api-python % artillery quick --count 20 -n 20 http://127.0.0.1:8000/ --output report.json && artillery report report.json

artillery quick --count 40 -n 20 http://127.0.0.1:8000/ --output report.json
Log file: report.json
artillery report report.json
Report generated: report.json.html

1 Subscriber in Queue Group ~2 seconds median

Median of ~2000ms (2 seconds) with 40 users sending 20 requests each

3 Subscribers in Queue Group ~500 milliseconds median

Median of ~500ms (1/2 second) with 40 users sending 20 requests each

As can be seen, the Queue Group pattern opens many possibilities for handling targeted scalability. This was a feature I didn’t expect would perform this well and be implemented with such ease. All in all, this will definitely be a pattern I will use with NATS, and possibly with other connective technologies if supported. Woot!

Take me to the Code!!!

Leave a comment