Quality RTOS & Embedded Software

  Real time embedded FreeRTOS RSS feed  
NOTE: The AWS IoT Jobs library and documentation are in the FreeRTOS Labs.  The libraries in the FreeRTOS Labs download directory are fully functional, but undergoing optimizations or refactoring to improve memory usage, modularity, documentation, demo usability, or test coverage.  They are available as part of the main download.

Jobs Demo

Introduction

The AWS IoT Jobs Demo shows you how to connect to the AWS IoT Jobs service through an MQTT connection, retrieve a job from AWS IoT, and process it on a device.  The AWS IoT Jobs Demo project uses the FreeRTOS Windows port, so it can be built and evaluated with the free Community version of Visual Studio on Windows without the need for any particular MCU hardware.  The demo establishes a secure connection to the AWS IoT MQTT broker using TLS in the same manner as the MQTT mutual authentication demo.

Source Code Organization

The Visual Studio solution for the Jobs Demo is called jobs_loop_demo.sln and is located in the \FreeRTOS-Plus\Demo\FreeRTOS_IoT_Libraries\jobs\jobs_notify_next directory of the FreeRTOS labs download.

Note: The project is not included in the main FreeRTOS download at this time. It is provided as a separate zip file download.

The Jobs library utilizes the IoT MQTT library and a lightweight JSON parser library.

Configure the Demo Project

The demo uses the FreeRTOS+TCP TCP/IP stack. Follow the instructions provided for the TCP/IP starter project to:

  1. Install the pre-requisite components (such as WinPCap).
  2. Optionally set a static or dynamic IP address, gateway address and netmask.
  3. Optionally set a MAC address.
  4. Select an Ethernet network interface on your host machine.
  5. (Important!) Test your network connection before you run the Jobs demo.

All these settings should be performed in the Jobs Demo project, not in the TCP/IP starter project referenced from the same page. As delivered, the TCP/IP stack is configured to use a dynamic IP address.

Configure the MQTT Broker Connection

In this demo you use an MQTT connection to the AWS IoT MQTT broker. This connection is configured in the same way as the MQTT mutual authentication demo.

Build the Demo Project

The demo project uses the free community edition of Visual Studio

To build the demo:

  1. Open the \FreeRTOS-Plus\Demo\FreeRTOS_IoT_Libraries\jobs\jobs_notify_next\jobs_loop_demo.sln Visual Studio solution file from within the Visual Studio IDE.
  2. Select ‘build solution’ from the IDE’s ‘build’ menu.

 

Functionality

The demo shows the workflow used to receive jobs from AWS IoT and process them on a device. It sets a callback for when a message is received on the “$aws/things/THING_NAME/jobs/notify-next” topic (where THING_NAME is the name of your AWS IoT thing). A message is published to this topic when there is a new job to execute. The demo is interactive and requires you to create jobs using either the AWS IoT console or the AWS CLI. See create-job in the AWS CLI Command Reference for more information about creating a job.

The demo requires the job document to have an “action” key, which can be “print” to print a message to the console, “publish” to republish the message to a topic, or “exit” to exit the demo. The formats for the jobs are as follows:

Document for a print:

{
    "action": "print",
    "message": "INSERT_MESSAGE_HERE"
}

Document for a publish:

{
    "action": "publish",
    "message": "INSERT_MESSAGE_HERE",
    "topic": "topic/name/here"
}

Document for an exit:

{
    "action: "exit"
}

Using the AWS CLI, a job could be created as follows:

aws iot create-job --target-selection SNAPSHOT --targets "THING_ARN_HERE" --document '{ "action": "publish", "message": "Hello World!", "topic": "hello/world" }' --job-id "some_unique_id"

replacing “THING_ARN_HERE” with the ARN of your IoT Thing, and “some_unique_id” with any unique identifier for the job.

Using Python with boto3, the same can be done as:

import boto3

# Each job must have a unique ID.
job_id = 'some_unique_id'
# Replace with the name of your IoT Thing.
thing_name = 'YOUR_THING_NAME_HERE'
job_document = '{ "action": "publish", "message": "Hello World!", "topic": "hello/world" }'

client = boto3.client('iot')
# Get ARN from name of Thing.
thing_arn = client.describe_thing(thingName=thing_name)['thingArn']
client.create_job(jobId=job_id, targets=[ thing_arn ], document=job_document, targetSelection='SNAPSHOT')

Note that this code retrieves the Thing ARN from your Thing Name, so it is not necessary to enter it manually.

 

The main task in the demo blocks on xTaskNotifyWait() until either the exit bit is set in ulNotificationValue, or a preconfigured amount of time passes. This duration is controlled by the jobsexampleMS_BEFORE_EXIT macro. Note that if MQTT keep-alive is disabled, then the connection to AWS IoT will be disconnected after 30 minutes of idleness. Keep-alive is disabled in this demo because of inconsistent timing in the Windows simulator that makes it unreliable.

The structure of the demo is:

static void prvJobsDemoTask( void *pvParameters )
{
IotMqttError_t xResult;
IotNetworkError_t xNetworkInit;
uint32_t ulNotificationValue = 0;
const TickType_t xNoDelay = ( TickType_t ) 0;

	/* Remove compiler warnings about unused parameters. */
	( void ) pvParameters;

	/* The MQTT library needs a task pool, so create the system task pool. */
	xResult = IotTaskPool_CreateSystemTaskPool( &( xTaskPoolParameters ) );
	configASSERT( xResult == IOT_TASKPOOL_SUCCESS );

	/* Initialize the network stack abstraction for FreeRTOS. */
	xNetworkInit = IotNetworkFreeRTOS_Init();
	configASSERT( xNetworkInit == IOT_NETWORK_SUCCESS );

	/* MQTT library must be initialized before it can be used. This is just one
	 * time initialization. */
	xResult = IotMqtt_Init();
	configASSERT( xResult == IOT_MQTT_SUCCESS );

	/* Initialize Jobs library. */
	xResult = AwsIotJobs_Init( jobsexampleUSE_DEFAULT_MQTT_TIMEOUT );
	configASSERT( xResult == AWS_IOT_JOBS_SUCCESS );

	/****************************** Connect. ******************************/
	/* Establish a connection to the AWS IoT MQTT broker. This example connects to
	 * the MQTT broker as specified in awsiotconfigAWS_IOT_ENDPOINT and
	 * awsiotconfigMQTT_BROKER_PORT at the top of this file.
	 */
	configPRINTF( ( "Attempt to connect to %s\r\n", awsiotconfigAWS_IOT_ENDPOINT ) );
	prvMQTTConnect();
	configPRINTF( ( "Connected to %s\r\n", awsiotconfigAWS_IOT_ENDPOINT ) );

	/* Don't expect any notifications to be pending yet. */
	configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 );

	configPRINTF( ( "Setting callback for jobs/notify-next\r\n" ) );
	prvSetNotifyNextCallback();

	/* Call DescribeAsync to see if there are any pending jobs. */
	xRequestInfo.mqttConnection = xMQTTConnection;
	xRequestInfo.pThingName = awsiotdemoprofileCLIENT_IDENTIFIER;
	xRequestInfo.thingNameLength = jobsexampleCLIENT_IDENTIFIER_LENGTH;
	xRequestInfo.pJobId = AWS_IOT_JOBS_NEXT_JOB;
	xRequestInfo.jobIdLength = AWS_IOT_JOBS_NEXT_JOB_LENGTH;

	/* Use the same callback as notify-next so any pending jobs will be
	 * executed the same way. */
	xCallbackInfo.function = prvJobsCallback;

	xStatus = AwsIotJobs_DescribeAsync( &xRequestInfo, AWS_IOT_JOBS_NO_EXECUTION_NUMBER, true, 0, &xCallbackInfo, NULL );
	configPRINTF( ( "Describe queued with result %s.\r\n", AwsIotJobs_strerror( xStatus ) ) );

	/* Print out a short user guide to the console. The default logging
	 * limit of 255 characters can be changed in demo_logging.c, but breaking
	 * up the only instance of a 1000+ character string is more practical. */
	configPRINTF( (
					  "\r\n"
					  "/*-----------------------------------------------------------*/\r\n"
					  "\r\n"
					  "The Jobs demo is now ready to accept Jobs.\r\n"
					  "Jobs may be created using the AWS IoT console or AWS CLI.\r\n"
					  "See the following link for more information.\r\n"
					  "\r\n" ) );
	configPRINTF( (
					  "\r"
					  "https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html\r\n"
					  "\r\n"
					  "This demo expects Job documents to have an \"action\" JSON key.\r\n"
					  "The following actions are currently supported:\r\n" ) );
	configPRINTF( (
					  "\r"
					  " - print          \r\n"
					  "   Logs a message to the local console. The Job document must also contain a \"message\".\r\n"
					  "   For example: { \"action\": \"print\", \"message\": \"Hello world!\"} will cause\r\n"
					  "   \"Hello world!\" to be printed on the console.\r\n" ) );
	configPRINTF( (
					  "\r"
					  " - publish        \r\n"
					  "   Publishes a message to an MQTT topic. The Job document must also contain a \"message\" and \"topic\".\r\n" ) );
	configPRINTF( (
					  "\r"
					  "   For example: { \"action\": \"publish\", \"topic\": \"demo/jobs\", \"message\": \"Hello world!\"} will cause\r\n"
					  "   \"Hello world!\" to be published to the topic \"demo/jobs\".\r\n" ) );
	configPRINTF( (
					  "\r"
					  " - exit           \r\n"
					  "   Exits the demo program. This program will run until { \"action\": \"exit\" } is received.\r\n"
					  "\r\n"
					  "/*-----------------------------------------------------------*/\r\n" ) );

	/* Wait for an exit job to be received. If an exit job is not received within
	 * jobsexampleMS_BEFORE_EXIT, exit anyway. This is because we have disabled
	 * keep-alive, and the server will disconnect as after some time. */
	xTaskNotifyWait( 0UL,					   /* Don't clear any bits on entry. */
					 jobsexampleEXIT_BIT,	   /* Clear bit on exit. */
					 &( ulNotificationValue ), /* Obtain the notification value. */
					 pdMS_TO_TICKS( jobsexampleMS_BEFORE_EXIT) );
	/* Check was due to receiving an exit job. */
	if( ( ulNotificationValue & jobsexampleEXIT_BIT ) != jobsexampleEXIT_BIT )
	{
		configPRINTF( ( "Disconnecting as %u milliseconds have elapsed.\r\n", jobsexampleMS_BEFORE_EXIT ) );
	}

	/* Disconnect MQTT gracefully. */
	prvMQTTDisconnect();
	configPRINTF( ( "Disconnected from %s\r\n\r\n", awsiotdemoprofileAWS_ENDPOINT ) );

	/* Wait for the disconnect operation to complete which is informed to us
	 * by the disconnect callback (prvExample_OnDisconnect)by setting
	 * the jobsexampleDISCONNECTED_BIT in this task's notification value. */
	xTaskNotifyWait( 0UL,						  /* Don't clear any bits on entry. */
					 jobsexampleDISCONNECTED_BIT, /* Clear bit on exit. */
					 &( ulNotificationValue ),	  /* Obtain the notification value. */
					 pdMS_TO_TICKS( jobsexampleMQTT_TIMEOUT_MS ) );
	configASSERT( ( ulNotificationValue & jobsexampleDISCONNECTED_BIT ) == jobsexampleDISCONNECTED_BIT );

	configPRINTF( ( "prvJobsDemoTask() completed successfully. Total free heap is %u\r\n", xPortGetFreeHeapSize() ) );
	configPRINTF( ( "Demo completed successfully.\r\n" ) );

	/* Clean up initialized libraries. */
	AwsIotJobs_Cleanup();
	IotMqtt_Cleanup();
	IotNetworkFreeRTOS_Cleanup();

}

Connecting to the AWS IoT MQTT Server

To connect to the AWS IoT MQTT broker, use the same method as prvMQTTConnect() in the MQTT mutual authentication demo.

 

Registering the Notify-Next Callback

The function prvSetNotifyNextCallback() sets the callback for when a message is received on the “$aws/things/THING_NAME/jobs/notify-next” topic that indicates that a new job is ready for execution. The definition is:

static void prvSetNotifyNextCallback( void )
{
AwsIotJobsError_t xCallbackStatus = AWS_IOT_JOBS_SUCCESS;
AwsIotJobsCallbackInfo_t xCallbackInfo = AWS_IOT_JOBS_CALLBACK_INFO_INITIALIZER;

	/* Set the jobs callback function. */
	xCallbackInfo.function = prvJobsCallback;

	/************************ Set notify-next callbacks **********************/

	xCallbackStatus = AwsIotJobs_SetNotifyNextCallback( xMQTTConnection,
							    awsiotdemoprofileCLIENT_IDENTIFIER,
							    jobsexampleCLIENT_IDENTIFIER_LENGTH,
							    0,
							    &xCallbackInfo );

	configASSERT( xCallbackStatus == AWS_IOT_JOBS_SUCCESS );
}

The callback that is set is prvJobsCallback(). It parses the job document and calls the appropriate function to process it. The function definition is:

static void prvJobsCallback( void * pCallbackContext,
							 AwsIotJobsCallbackParam_t * pxCallbackInfo )
{
BaseType_t xIdKeyFound = pdFALSE, xDocKeyFound = pdFALSE;
const char * pcJobId = NULL;
size_t xJobIdLength = 0;
const char * pcJobDoc = NULL;
size_t xJobDocLength = 0;

	/* Silence warnings about unused parameters. */
	( void ) pCallbackContext;

	configASSERT( pxCallbackInfo != NULL );

	/* Get the Job ID. */
	xIdKeyFound = prvGetJsonString( pxCallbackInfo->u.callback.pDocument,
					pxCallbackInfo->u.callback.documentLength,
					jobsexampleID_KEY,
					jobsexampleID_KEY_LENGTH,
					&pcJobId,
					&xJobIdLength );

	if( xIdKeyFound == pdTRUE )
	{
		if( xJobIdLength > jobsexampleID_MAX_LENGTH )
		{
			configPRINTF( ( "Received Job ID %.*s longer than %lu, which is the "
							"maximum allowed by AWS IoT. Ignoring Job.\r\n",
							xJobIdLength,
							pcJobId,
							( unsigned long ) jobsexampleID_MAX_LENGTH ) );

			xIdKeyFound = pdFALSE;
		}
		else
		{
			configPRINTF( ( "Job %.*s received.\r\n", xJobIdLength, pcJobId ) );
		}
	}

	/* Get the Job document. */
	xDocKeyFound = ( BaseType_t ) IotJsonUtils_FindJsonValue(
		pxCallbackInfo->u.callback.pDocument,
		pxCallbackInfo->u.callback.documentLength,
		jobsexampleDOC_KEY,
		jobsexampleDOC_KEY_LENGTH,
		&pcJobDoc,
		&xJobDocLength );

	/* When both the Job ID and Job document are available, process the Job. */
	if( ( xIdKeyFound == pdTRUE ) && ( xDocKeyFound == pdTRUE ) )
	{
		/* Process the Job document. */
		prvProcessJob(  pxCallbackInfo,
				   pcJobId,
				   xJobIdLength,
				   pcJobDoc,
				   xJobDocLength );
	}
	else
	{
		/* The Jobs service sends an empty Job document when all Jobs are complete. */
		if( ( xIdKeyFound == pdFALSE ) && ( xDocKeyFound == pdFALSE ) )
		{
			configPRINTF( (
				"\r\n"
				"/*-----------------------------------------------------------*/\r\n"
				"\r\n"
				"All available Jobs complete.\r\n"
				"\r\n"
				"/*-----------------------------------------------------------*/\r\n"
				"\r\n" ) );
		}
		else
		{
			configPRINTF( ( "Received an invalid Job document: %.*s\r\n",
							pxCallbackInfo->u.callback.documentLength,
							pxCallbackInfo->u.callback.pDocument ) );
		}
	}
}

After searching the job document, the callback logs an error if the document is invalid, prints success if it was invoked due to all available jobs being completed, or calls a function to process the document for this demo.

Processing the Job

The prvProcessJob() function starts the job, performs the appropriate action based on its command, and updates the job’s status on AWS IoT. The function starts the job by calling AwsIotJobs_StartNextAsync(), which lets AWS IoT know the job has been started. AWS IoT changes the status of the job from “queued” to “in progress”. The function then determines whether the requested action was a print, publish, or exit, and takes action accordingly. After this, it calls AwsIotJobs_UpdateAsync() to tell AWS IoT that the job execution has been completed. The function definition is:

static void prvProcessJob( const AwsIotJobsCallbackParam_t * pxJobInfo,
			   const char * pcJobId,
			   size_t xJobIdLength,
			   const char * pcJobDoc,
			   size_t xJobDocLength )
{
AwsIotJobsError_t xStatus = AWS_IOT_JOBS_SUCCESS;
AwsIotJobsUpdateInfo_t xUpdateInfo = AWS_IOT_JOBS_UPDATE_INFO_INITIALIZER;
AwsIotJobsCallbackInfo_t xCallbackInfo = AWS_IOT_JOBS_CALLBACK_INFO_INITIALIZER;
const char * pcAction = NULL;
size_t xActionLength = 0;
_jobAction_t xAction = JOB_ACTION_UNKNOWN;
AwsIotJobsRequestInfo_t xRequestInfo = AWS_IOT_JOBS_REQUEST_INFO_INITIALIZER;

	configPRINTF( ( "Job document received: %.*s\r\n", xJobDocLength, pcJobDoc ) );

	xRequestInfo.mqttConnection = pxJobInfo->mqttConnection;
	xRequestInfo.pThingName = pxJobInfo->pThingName;
	xRequestInfo.thingNameLength = pxJobInfo->thingNameLength;
	xRequestInfo.pJobId = pcJobId;
	xRequestInfo.jobIdLength = xJobIdLength;

	/* Tell the Jobs service that the device has started working on the Job.
	 * Use the StartNext API to set the Job's status to IN_PROGRESS. */
	xCallbackInfo.function = prvOperationCompleteCallback;

	xStatus = AwsIotJobs_StartNextAsync( &xRequestInfo, &xUpdateInfo, 0, &xCallbackInfo, NULL );

	configPRINTF( ( "Jobs StartNext queued with result %s.\r\n", AwsIotJobs_strerror( xStatus ) ) );

	/* Get the action for this device. */
	if( prvGetJsonString( pcJobDoc,
				  xJobDocLength,
				  jobsexampleACTION_KEY,
				  jobsexampleACTION_KEY_LENGTH,
				  &pcAction,
				  &xActionLength ) == pdTRUE )
	{
		xAction = prvGetAction( pcAction, xActionLength );

		switch( xAction )
		{
			case JOB_ACTION_EXIT:
				xCallbackInfo.pCallbackContext = jobsexampleSHOULD_EXIT;
				xUpdateInfo.newStatus = AWS_IOT_JOB_STATE_SUCCEEDED;
				break;

			case JOB_ACTION_PRINT:
			case JOB_ACTION_PUBLISH:
				xUpdateInfo.newStatus = prvProcessMessage( pxJobInfo->mqttConnection,
									   xAction,
									   pcJobDoc,
									   xJobDocLength );
				break;

			default:
				configPRINTF( ( "Received Job document with unknown action %.*s.\r\n",
								xActionLength,
								pcAction ) );

				xUpdateInfo.newStatus = AWS_IOT_JOB_STATE_FAILED;
				break;
		}
	}
	else
	{
		configPRINTF( ( "Received Job document does not contain an %s key.\r\n",
						jobsexampleACTION_KEY ) );

		/* The given Job document is not valid for this demo. */
		xUpdateInfo.newStatus = AWS_IOT_JOB_STATE_FAILED;
	}

	configPRINTF( ( "Setting state of %.*s to %s.\r\n",
					xJobIdLength,
					pcJobId,
					AwsIotJobs_StateName( xUpdateInfo.newStatus ) ) );

	/* Tell the Jobs service that the device has finished the Job. */
	xStatus = AwsIotJobs_UpdateAsync( &xRequestInfo, &xUpdateInfo, 0, &xCallbackInfo, NULL );

	configPRINTF( ( "Jobs Update queued with result %s.\r\n", AwsIotJobs_strerror( xStatus ) ) );
}

The prvOperationCompleteCallback() function is invoked when either StartNext or Update is completed, and prints this information to the console.

Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.