Quality RTOS & Embedded Software


coreMQTT Agent Demo

Single Threaded Vs Multi Threaded

There are two coreMQTT usage models, single threaded and multithreaded (multitasking). The single threaded model uses the coreMQTT library solely from one thread, and requires you to make repeated, explicit calls into the MQTT library. Multithreaded use cases can instead run the MQTT protocol in the background within an agent (or daemon) task, as shown in the demo documented on this page. When you run the MQTT protocol in an agent task you do not have to explicitly manage any MQTT state or call the MQTT_ProcessLoop() API function. Also, when you use an agent task, multiple application tasks can share a single MQTT connection without the need for synchronization primitives such as mutexes.

Demo Introduction

The coreMQTT Agent demo project uses the FreeRTOS Windows port, so it can be built and evaluated with the free Community version of Visual Studios on Windows, without the need for any MCU hardware.
This demo uses a thread safe queue to hold commands for interacting with the coreMQTT API. There are two tasks to note in this demo:
  • An MQTT agent (main) task processes the commands from the command queue while the other tasks enqueue them. This task enters a loop, during which it processes commands from the command queue. If a termination command is received, it will break out of the loop.
  • A demo subpub task creates a subscription to an MQTT topic, then creates publish operations and pushes them to the command queue. These publish operations are then run by the MQTT agent task. The demo subpub task waits for the publish to complete, indicated by execution of the command completion callback, then enters a short delay before it starts the next publish. This task shows examples of how application tasks would use the coreMQTT Agent API.
For incoming publish messages, the coreMQTT Agent invokes a single callback function. This demo also includes a subscription manager to allow tasks to specify a callback to invoke for incoming publish messages on topics to which they have subscribed. The agent's incoming publish callback in this demo invokes the subscription manager to fan out publishes to any task that has registered a subscription.

The coreMQTT Agent demo can be configured to use either a TLS connection with mutual authentication, or a plaintext TCP connection. By default, the demo uses TLS. If the network unexpectedly disconnects during the demo, then the client will attempt to reconnect with exponential backoff logic. We also recommend that you use mutual authentication in any Internet of Things (IoT) application.

Source Code Organization

The Visual Studio solution for the Multithreaded MQTT demo is called mqtt_multitask_demo.sln and is in the /FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask directory of the main FreeRTOS download.

Click to enlarge

Building the Demo Project

The demo project uses the free community edition of Visual Studio. To build the demo:
  1. Open the mqtt_multitask_demo.sln Visual Studio solution file from within the Visual Studio IDE.
  2. Select Build Solution from the IDE's Build menu.
Note: If you use Microsoft Visual Studio 2017 or earlier, then a Platform Toolset compatible with the Visual Studio version must be selected: Project -> RTOSDemos Properties -> Platform Toolset.

Configuring the Demo Project

The demo uses the FreeRTOS+TCP TCP/IP stack, so follow the instructions provided for the TCP/IP starter project:
  1. Install the pre-requisite components (such as WinPCap).
  2. Optionally set a static or dynamic IP address, gateway address and netmask.
  3. Optionally set a MAC address.
  4. Select an Ethernet network interface]() on the host machine.
  5. ...and most important test the network connection before attempting to run the MQTT demo.
All of these settings should be changed in the MQTT Agent demo project, not the TCP/IP starter project referred to in the pages linked to above! As delivered, the TCP/IP stack is configured to use a dynamic IP address.

Configuring the MQTT Broker Connection

Option 1: TLS with Mutual Authentication (default)

This demo supports the same configuration options as the MQTT Mutual Auth demo. Please see that demo's documentation for all of the available options.

Option 2: Plaintext

To enable quick setup with no certificate configuration required, the demo allows plaintext TCP connections to be used in lieu of mutually authenticated TLS. To disable TLS, The macro democonfigUSE_TLS should be set to 0 in demo_config, or simply not defined at all. Then, the demo may be used with any unencrypted MQTT broker (for example, Eclipse Mosquitto) by following the same instructions as the Plaintext demo.

Important notes:
  • Plain text connections are useful to learn how MQTT works and to debug connections because the MQTT conversation can be viewed in a network sniffer such as WireShark. However, production IoT devices should not use an unencrypted plain text connection, as all traffic is exposed to any device on the network. Never send private data over a plain text connection - we highly recommend that you use encrypted and mutually authenticated connections for all IoT devices.
  • The demo places keys in a header file for demonstration purposes only. We strongly recommend that production devices store keys in a secure location, such as a secure element or enclave. Further, we recommend production IoT devices access keys and other crypto objects via an API that does not expose the keys, such as PKCS #11 or PSA APIs.
Once a broker is decided, the code snippets below describe the compile time configuration constants that configure the MQTT connection. Place any keys required for TLS connections in the same file: demo_config.h, demo_config.h

coreMQTT Agent configuration constants

The following code snippet shows the compile time constants relevant to the coreMQTT Agent, along with their default values. The coreMQTT Agent does not have its own configuration file, but it will use any macros defined in core_mqtt_config.h. Macros defined there will be used in place of these defaults: core_mqtt_config.h


main() initializes the TCP/IP stack before starting the FreeRTOS kernel scheduler. When the network connects, the network event hook creates a single RTOS task. That task optionally creates multiple other tasks, each of which sends and receives MQTT packets of various sizes and at various Quality of Service (QoS) levels. The original thread then becomes the MQTT agent thread.

The following constant defines the created demo tasks. The links in the descriptions go to comments at the top of each implementing source filethat provide more information.
  • #define democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE [n]
    Sets the number of instances to create of the task implemented in simple_pub_sub_demo.c.

MQTT Agent Task

The MQTT agent task is responsible for invoking MQTTAgent_CommandLoop(). This task calls the requested coreMQTT API when a command is posted to its queue.

When you invoke a coreMQTT Agent API it creates a command that is sent to the agent task's queue, which is processed in MQTTAgent_CommandLoop(). At time the command is created, optional completion callback and context parameters may be passed. Once the corresponding command is complete, the completion callback will be invoked with the passed context and any return values as a result of the command. The signature for the completion callback is as follows:

typedef void (* MQTTAgentCommandCallback_t )( void * pCmdCallbackContext,
MQTTAgentReturnInfo_t * pReturnInfo );
The command completion context is user-defined; for this demo, it is: struct MQTTAgenCommandContext

Commands are considered completed when:
  • Subscribes, unsubscribes, and publishes with QoS > 0: Once the corresponding acknowledgment packet has been received.
  • All other operations: Once the corresponding coreMQTT API has been invoked.
Any structures used by the command, including publish information, subscription information, and completion contexts, must stay in scope until the command has completed. A calling task must not reuse any of a command's structures before the invocation of the completion callback. Note that since the completion callback is invoked by the MQTT Agent, it will run with the thread context of the agent task, not the task that created the command. Inter-process communication mechanisms, such as task notifications or queues, can be used to signal the calling task of command completion.

Running the Command Loop

Commands are processed continuously in MQTTAgent_CommandLoop(). If there are no commands to be processed, the loop will wait for a maximum of MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME for one to be added to the queue, and, if no command is added, run a single iteration of MQTT_ProcessLoop(). This ensures both that MQTT Keep-Alive is managed, and that any incoming publishes are received even when there are no commands in the queue.
The command loop function will return for the following reasons:
  • A command returns any status code besides MQTTSuccess. The error status is returned by the command loop, so you may decide how to handle it. In this demo, the TCP connection is reestablished, and a reconnect attempt is made. If there is any error, a reconnection can occur in the background without any intervention from other tasks using MQTT.
  • A disconnect command (from MQTTAgent_Disconnect) is processed. The command loop exits so that TCP can be disconnected.
  • A terminate command (from MQTTAgent_Terminate) is processed. This command also marks any command still in the queue or awaiting an acknowledgment packet as an error, with a return code of MQTTRecvFailed.
Here's how these cases are handled in the agent task: prvMQTTAgentTask()

Creating the other Tasks
The MQTT agent task in this demo also creates the subscribe-publish tasks. While this is not necessary for the implementation of an agent task, it is done here as the agent is the main task in this demo: prvConnectAndCreateDemoTasks()

Subscription Manager

Since the demo uses multiple topics, a subscription manager is a convenient way to associate subscribed topics with unique callbacks or tasks. The subscription manager in this demo is single-threaded, so it should not be used by multiple tasks concurrently. In this demo, subscription manager functions are only called from callback functions that are passed to the MQTT agent, and run only with the agent task's thread context. The following types and functions compose the subscription manager: Callback function

Simple subscribe-publish Task

Each instance of the subscribe-publish task creates a subscription to an MQTT topic, and creates publish operations for that topic. To demonstrate multiple publish types, even numbered tasks use QoS 0 (which are complete once the publish packet is sent) and odd tasks use QoS 1 (which are complete upon receipt of a PUBACK packet). The sub-pub tasks are defined as follows: prvSimpleSubscribePublishTask()
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.