1 /*
2  * FreeRTOS V202212.00
3  * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of
6  * this software and associated documentation files (the "Software"), to deal in
7  * the Software without restriction, including without limitation the rights to
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9  * the Software, and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in all
13  * copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21  *
22  * https://www.FreeRTOS.org
23  * https://github.com/FreeRTOS
24  *
25  */
26 
27 /**
28  * @file JobsDemoExample.c
29  *
30  * @brief Demo for showing use of the Jobs library API. This demo uses the Jobs library
31  * along with the coreMQTT, mbedTLS and FreeRTOS+TCP libraries to communicate with the AWS IoT Jobs service.
32  * Please refer to AWS documentation for more information about AWS IoT Jobs.
33  * https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html
34  *
35  * The Jobs library API provides macros and helper functions for assembling MQTT topics strings,
36  * and for determining whether an incoming MQTT message is related to the AWS IoT Jobs service.
37  * The Jobs library does not depend on an MQTT library, and therefore, the code for MQTT operations
38  * is placed in another file (mqtt_demo_helpers.c) for improving readability of the demo code about using
39  * the Jobs library.
40  *
41  * @note This demo requires setup of an AWS account, provisioning of a Thing resource on the AWS IoT account,
42  * and the creation of Jobs for the Thing resource. Please refer to AWS CLI documentation for more information
43  * in creating a job document.
44  * https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html
45  *
46  * This demo connects to the AWS IoT broker and calls the MQTT APIs of the AWS IoT Jobs service to receive
47  * jobs queued (as JSON documents) for the Thing resource (associated with this demo application) on the cloud,
48  * then executes the jobs and updates the status of the jobs back to the cloud.
49  * The demo expects job documents to have an "action" JSON key. Actions can
50  * be one of "print", "publish", or "exit".
51  * A "print" job logs a message to the local console, and must contain a "message",
52  * e.g. { "action": "print", "message": "Hello World!" }.
53  * A "publish" job publishes a message to an MQTT Topic. The job document must
54  * contain a "message" and "topic" to publish to, e.g.
55  * { "action": "publish", "topic": "demo/jobs", "message": "Hello World!" }.
56  * An "exit" job exits the demo. Sending { "action": "exit" } will end the demo program.
57  */
58 
59 /* Standard includes. */
60 #include <assert.h>
61 #include <stdlib.h>
62 #include <string.h>
63 #include <stdint.h>
64 
65 /* Demo Specific config file. */
66 #include "demo_config.h"
67 
68 /* Kernel includes. */
69 #include "FreeRTOS.h"
70 #include "task.h"
71 #include "queue.h"
72 
73 /* Jobs library header. */
74 #include "jobs.h"
75 
76 /* JSON library includes. */
77 #include "core_json.h"
78 
79 /* Include common MQTT demo helpers. */
80 #include "mqtt_demo_helpers.h"
81 
82 /*------------- Demo configurations -------------------------*/
83 
84 #ifndef democonfigTHING_NAME
85     #error "Please define the Thing resource name, democonfigTHING_NAME, in demo_config.h"
86 #endif
87 
88 /**
89  * @brief The length of #democonfigTHING_NAME.
90  */
91 #define THING_NAME_LENGTH    ( ( uint16_t ) ( sizeof( democonfigTHING_NAME ) - 1 ) )
92 
93 /*-----------------------------------------------------------*/
94 
95 /**
96  * @brief The JSON key of the execution object.
97  *
98  * Job documents received from the AWS IoT Jobs service are in JSON format.
99  * All such JSON documents will contain this key, whose value represents the unique
100  * identifier of a Job.
101  */
102 #define jobsexampleEXECUTION_KEY                    "execution"
103 
104 /**
105  * @brief The length of #jobsexampleEXECUTION_KEY.
106  */
107 #define jobsexampleEXECUTION_KEY_LENGTH             ( sizeof( jobsexampleEXECUTION_KEY ) - 1 )
108 
109 /**
110  * @brief The query key to use for searching the Job ID key in message payload
111  * from AWS IoT Jobs service.
112  *
113  * Job documents received from the AWS IoT Jobs service are in JSON format.
114  * All such JSON documents will contain this key, whose value represents the unique
115  * identifier of a Job.
116  */
117 #define jobsexampleQUERY_KEY_FOR_JOB_ID             jobsexampleEXECUTION_KEY  ".jobId"
118 
119 /**
120  * @brief The length of #jobsexampleQUERY_KEY_FOR_JOB_ID.
121  */
122 #define jobsexampleQUERY_KEY_FOR_JOB_ID_LENGTH      ( sizeof( jobsexampleQUERY_KEY_FOR_JOB_ID ) - 1 )
123 
124 /**
125  * @brief The query key to use for searching the Jobs document ID key in message payload
126  * from AWS IoT Jobs service.
127  *
128  * Job documents received from the AWS IoT Jobs service are in JSON format.
129  * All such JSON documents will contain this key, whose value represents the unique
130  * identifier of a Job.
131  */
132 #define jobsexampleQUERY_KEY_FOR_JOBS_DOC           jobsexampleEXECUTION_KEY  ".jobDocument"
133 
134 /**
135  * @brief The length of #jobsexampleQUERY_KEY_FOR_JOBS_DOC.
136  */
137 #define jobsexampleQUERY_KEY_FOR_JOBS_DOC_LENGTH    ( sizeof( jobsexampleQUERY_KEY_FOR_JOBS_DOC ) - 1 )
138 
139 /**
140  * @brief The query key to use for searching the Action key in Jobs document
141  * from AWS IoT Jobs service.
142  *
143  * This demo program expects this key to be in the Job document. It is a key
144  * specific to this demo.
145  */
146 #define jobsexampleQUERY_KEY_FOR_ACTION             "action"
147 
148 /**
149  * @brief The length of #jobsexampleQUERY_KEY_FOR_ACTION.
150  */
151 #define jobsexampleQUERY_KEY_FOR_ACTION_LENGTH      ( sizeof( jobsexampleQUERY_KEY_FOR_ACTION ) - 1 )
152 
153 /**
154  * @brief The query key to use for searching the Message key in Jobs document
155  * from AWS IoT Jobs service.
156  *
157  * This demo program expects this key to be in the Job document if the "action"
158  * is either "publish" or "print". It represents the message that should be
159  * published or printed, respectively.
160  */
161 #define jobsexampleQUERY_KEY_FOR_MESSAGE            "message"
162 
163 /**
164  * @brief The length of #jobsexampleQUERY_KEY_FOR_MESSAGE.
165  */
166 #define jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH     ( sizeof( jobsexampleQUERY_KEY_FOR_MESSAGE ) - 1 )
167 
168 /**
169  * @brief The query key to use for searching the topic key in Jobs document
170  * from AWS IoT Jobs service.
171  *
172  * This demo program expects this key to be in the Job document if the "action"
173  * is "publish". It represents the MQTT topic on which the message should be
174  * published.
175  */
176 #define jobsexampleQUERY_KEY_FOR_TOPIC              "topic"
177 
178 /**
179  * @brief The length of #jobsexampleQUERY_KEY_FOR_TOPIC.
180  */
181 #define jobsexampleQUERY_KEY_FOR_TOPIC_LENGTH       ( sizeof( jobsexampleQUERY_KEY_FOR_TOPIC ) - 1 )
182 
183 /**
184  * @brief Utility macro to generate the PUBLISH topic string to the
185  * DescribeJobExecution API of AWS IoT Jobs service for requesting
186  * the next pending job information.
187  *
188  * @param[in] thingName The name of the Thing resource to query for the
189  * next pending job.
190  */
191 #define DESCRIBE_NEXT_JOB_TOPIC( thingName ) \
192     ( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_JOBID_NEXT "/" JOBS_API_GETPENDING )
193 
194 /**
195  * @brief Utility macro to generate the subscription topic string for the
196  * NextJobExecutionChanged API of AWS IoT Jobs service that is required
197  * for getting notification about changes in the next pending job in the queue.
198  *
199  * @param[in] thingName The name of the Thing resource to query for the
200  * next pending Job.
201  */
202 #define NEXT_JOB_EXECUTION_CHANGED_TOPIC( thingName ) \
203     ( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_NEXTJOBCHANGED )
204 
205 /**
206  * @brief Format a JSON status message.
207  *
208  * @param[in] x one of "IN_PROGRESS", "SUCCEEDED", or "FAILED"
209  */
210 #define MAKE_STATUS_REPORT( x )    "{\"status\":\"" x "\"}"
211 
212 /**
213  * @brief The maximum number of times to run the loop in this demo.
214  *
215  * @note The demo loop is attempted to re-run only if it fails in an iteration.
216  * Once the demo loop succeeds in an iteration, the demo exits successfully.
217  */
218 #ifndef JOBS_MAX_DEMO_LOOP_COUNT
219     #define JOBS_MAX_DEMO_LOOP_COUNT    ( 3 )
220 #endif
221 
222 /**
223  * @brief Time in ticks to wait between retries of the demo loop if
224  * demo loop fails.
225  */
226 #define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS    ( pdMS_TO_TICKS( 5000U ) )
227 
228 /**
229  * @brief Length of the queue to pass Jobs messages to the job handling task.
230  */
231 #define JOBS_MESSAGE_QUEUE_LEN                       ( 10U )
232 
233 /*-----------------------------------------------------------*/
234 
235 /**
236  * @brief Currently supported actions that a job document can specify.
237  */
238 typedef enum JobActionType
239 {
240     JOB_ACTION_PRINT,   /**< Print a message. */
241     JOB_ACTION_PUBLISH, /**< Publish a message to an MQTT topic. */
242     JOB_ACTION_EXIT,    /**< Exit the demo. */
243     JOB_ACTION_UNKNOWN  /**< Unknown action. */
244 } JobActionType;
245 
246 /*-----------------------------------------------------------*/
247 
248 /**
249  * @brief Each compilation unit that consumes the NetworkContext must define it.
250  * It should contain a single pointer to the type of your desired transport.
251  * When using multiple transports in the same compilation unit, define this pointer as void *.
252  *
253  * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport.
254  */
255 struct NetworkContext
256 {
257     TlsTransportParams_t * pParams;
258 };
259 
260 /*-----------------------------------------------------------*/
261 
262 /**
263  * @brief The MQTT context used for MQTT operation.
264  */
265 static MQTTContext_t xMqttContext;
266 
267 /**
268  * @brief The network context used for mbedTLS operation.
269  */
270 static NetworkContext_t xNetworkContext;
271 
272 /**
273  * @brief The parameters for the network context using mbedTLS operation.
274  */
275 static TlsTransportParams_t xTlsTransportParams;
276 
277 /**
278  * @brief Static buffer used to hold MQTT messages being sent and received.
279  */
280 static uint8_t usMqttConnectionBuffer[ democonfigNETWORK_BUFFER_SIZE ];
281 
282 /**
283  * @brief Static buffer used to hold the job ID of the single job that
284  * is executed at a time in the demo. This buffer allows re-use of the MQTT
285  * connection context for sending status updates of a job while it is being
286  * processed.
287  */
288 static uint8_t usJobIdBuffer[ democonfigNETWORK_BUFFER_SIZE ];
289 
290 /**
291  * @brief Static buffer used to hold the job document of the single job that
292  * is executed at a time in the demo. This buffer allows re-use of the MQTT
293  * connection context for sending status updates of a job while it is being processed.
294  */
295 static uint8_t usJobsDocumentBuffer[ democonfigNETWORK_BUFFER_SIZE ];
296 
297 /**
298  * @brief Static buffer used to hold MQTT messages being sent and received.
299  */
300 static MQTTFixedBuffer_t xBuffer =
301 {
302     .pBuffer = usMqttConnectionBuffer,
303     .size    = democonfigNETWORK_BUFFER_SIZE
304 };
305 
306 /**
307  * @brief A global flag which represents whether a job for the "Exit" action
308  * has been received from AWS IoT Jobs service.
309  */
310 static BaseType_t xExitActionJobReceived = pdFALSE;
311 
312 /**
313  * @brief A global flag which represents whether an error was encountered while
314  * executing the demo.
315  *
316  * @note When this flag is set, the demo terminates execution.
317  */
318 static BaseType_t xDemoEncounteredError = pdFALSE;
319 
320 /**
321  * @brief Queue used to pass incoming Jobs messages to a task to handle them.
322  */
323 static QueueHandle_t xJobMessageQueue;
324 
325 /*-----------------------------------------------------------*/
326 
327 /**
328  * @brief Converts a string in a job document to a #JobActionType
329  * value.
330  *
331  * @param[in] pcAction The job action as a string.
332  * @param[in] xActionLength The length of @p pcAction.
333  *
334  * @return A #JobActionType equivalent to the given string.
335  */
336 static JobActionType prvGetAction( const char * pcAction,
337                                    size_t xActionLength );
338 
339 /**
340  * @brief This example uses the MQTT library of the AWS IoT Device SDK for
341  * Embedded C. This is the prototype of the callback function defined by
342  * that library. It will be invoked whenever the MQTT library receives an
343  * incoming message.
344  *
345  * @param[in] pxMqttContext MQTT context pointer.
346  * @param[in] pxPacketInfo Packet Info pointer for the incoming packet.
347  * @param[in] pxDeserializedInfo Deserialized information from the incoming packet.
348  */
349 static void prvEventCallback( MQTTContext_t * pxMqttContext,
350                               MQTTPacketInfo_t * pxPacketInfo,
351                               MQTTDeserializedInfo_t * pxDeserializedInfo );
352 
353 /**
354  * @brief Process payload from NextJobExecutionChanged and DescribeJobExecution
355  * API MQTT topics of AWS IoT Jobs service.
356  *
357  * This handler parses the received payload about the next pending job, identifies
358  * the action requested in the job document, and executes the action.
359  *
360  * @param[in] pPublishInfo Deserialized publish info pointer for the incoming
361  * packet.
362  */
363 static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo );
364 
365 /**
366  * @brief Sends an update for a job to the UpdateJobExecution API of the AWS IoT Jobs service.
367  *
368  * @param[in] pcJobId The job ID whose status has to be updated.
369  * @param[in] usJobIdLength The length of the job ID string.
370  * @param[in] pcJobStatusReport The JSON formatted report to send to the AWS IoT Jobs service
371  * to update the status of @p pcJobId.
372  */
373 static void prvSendUpdateForJob( char * pcJobId,
374                                  uint16_t usJobIdLength,
375                                  const char * pcJobStatusReport );
376 
377 /**
378  * @brief Executes a job received from AWS IoT Jobs service and sends an update back to the service.
379  * It parses the received job document, executes the job depending on the job "Action" type, and
380  * sends an update to AWS for the Job.
381  *
382  * @param[in] pcJobId The ID of the job to execute.
383  * @param[in] usJobIdLength The length of the job ID string.
384  * @param[in] pcJobDocument The JSON document associated with the @a pcJobID job
385  * that is to be processed.
386  * @param[in] usDocumentLength The length of the job document.
387  */
388 static void prvProcessJobDocument( char * pcJobId,
389                                    uint16_t usJobIdLength,
390                                    char * pcJobDocument,
391                                    size_t uxJobDocumentLength );
392 
393 /**
394  * @brief The task used to demonstrate the Jobs library API.
395  *
396  * @param[in] pvParameters Parameters as passed at the time of task creation.
397  * Not used in this example.
398  */
399 void prvJobsDemoTask( void * pvParameters );
400 
401 /*-----------------------------------------------------------*/
402 
403 extern BaseType_t xPlatformIsNetworkUp( void );
404 
405 /*-----------------------------------------------------------*/
406 
prvGetAction(const char * pcAction,size_t xActionLength)407 static JobActionType prvGetAction( const char * pcAction,
408                                    size_t xActionLength )
409 {
410     JobActionType xAction = JOB_ACTION_UNKNOWN;
411 
412     configASSERT( pcAction != NULL );
413 
414     if( strncmp( pcAction, "print", xActionLength ) == 0 )
415     {
416         xAction = JOB_ACTION_PRINT;
417     }
418     else if( strncmp( pcAction, "publish", xActionLength ) == 0 )
419     {
420         xAction = JOB_ACTION_PUBLISH;
421     }
422     else if( strncmp( pcAction, "exit", xActionLength ) == 0 )
423     {
424         xAction = JOB_ACTION_EXIT;
425     }
426 
427     return xAction;
428 }
429 
prvSendUpdateForJob(char * pcJobId,uint16_t usJobIdLength,const char * pcJobStatusReport)430 static void prvSendUpdateForJob( char * pcJobId,
431                                  uint16_t usJobIdLength,
432                                  const char * pcJobStatusReport )
433 {
434     char pUpdateJobTopic[ JOBS_API_MAX_LENGTH( THING_NAME_LENGTH ) ];
435     size_t ulTopicLength = 0;
436     JobsStatus_t xStatus = JobsSuccess;
437 
438     configASSERT( ( pcJobId != NULL ) && ( usJobIdLength > 0 ) );
439     configASSERT( pcJobStatusReport != NULL );
440 
441     /* Generate the PUBLISH topic string for the UpdateJobExecution API of AWS IoT Jobs service. */
442     xStatus = Jobs_Update( pUpdateJobTopic,
443                            sizeof( pUpdateJobTopic ),
444                            democonfigTHING_NAME,
445                            THING_NAME_LENGTH,
446                            pcJobId,
447                            usJobIdLength,
448                            &ulTopicLength );
449 
450     if( xStatus == JobsSuccess )
451     {
452         if( xPublishToTopic( &xMqttContext,
453                              pUpdateJobTopic,
454                              ulTopicLength,
455                              pcJobStatusReport,
456                              strlen( pcJobStatusReport ) ) == pdFALSE )
457         {
458             /* Set global flag to terminate demo as PUBLISH operation to update job status failed. */
459             xDemoEncounteredError = pdTRUE;
460 
461             LogError( ( "Failed to update the status of job: JobID=%.*s, NewStatePayload=%s",
462                         usJobIdLength, pcJobId, pcJobStatusReport ) );
463         }
464     }
465     else
466     {
467         /* Set global flag to terminate demo as topic generation for UpdateJobExecution API failed. */
468         xDemoEncounteredError = pdTRUE;
469 
470         LogError( ( "Failed to generate Publish topic string for sending job update: "
471                     "JobID=%.*s, NewStatePayload=%s",
472                     usJobIdLength, pcJobId, pcJobStatusReport ) );
473     }
474 }
475 
prvProcessJobDocument(char * pcJobId,uint16_t usJobIdLength,char * pcJobDocument,size_t uxJobDocumentLength)476 static void prvProcessJobDocument( char * pcJobId,
477                                    uint16_t usJobIdLength,
478                                    char * pcJobDocument,
479                                    size_t uxJobDocumentLength )
480 {
481     char * pcAction = NULL;
482     size_t uActionLength = 0U;
483     JSONStatus_t xJsonStatus = JSONSuccess;
484 
485     configASSERT( pcJobId != NULL );
486     configASSERT( usJobIdLength > 0 );
487     configASSERT( pcJobDocument != NULL );
488     configASSERT( uxJobDocumentLength > 0 );
489 
490     xJsonStatus = JSON_Search( pcJobDocument,
491                                uxJobDocumentLength,
492                                jobsexampleQUERY_KEY_FOR_ACTION,
493                                jobsexampleQUERY_KEY_FOR_ACTION_LENGTH,
494                                &pcAction,
495                                &uActionLength );
496 
497     if( xJsonStatus != JSONSuccess )
498     {
499         LogError( ( "Job document schema is invalid. Missing expected \"action\" key in document." ) );
500         prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
501     }
502     else
503     {
504         JobActionType xActionType = JOB_ACTION_UNKNOWN;
505         char * pcMessage = NULL;
506         size_t ulMessageLength = 0U;
507 
508         xActionType = prvGetAction( pcAction, uActionLength );
509 
510         switch( xActionType )
511         {
512             case JOB_ACTION_EXIT:
513                 LogInfo( ( "Received job contains \"exit\" action. Updating state of demo." ) );
514                 xExitActionJobReceived = pdTRUE;
515                 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "SUCCEEDED" ) );
516                 break;
517 
518             case JOB_ACTION_PRINT:
519                 LogInfo( ( "Received job contains \"print\" action." ) );
520 
521                 xJsonStatus = JSON_Search( pcJobDocument,
522                                            uxJobDocumentLength,
523                                            jobsexampleQUERY_KEY_FOR_MESSAGE,
524                                            jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH,
525                                            &pcMessage,
526                                            &ulMessageLength );
527 
528                 if( xJsonStatus == JSONSuccess )
529                 {
530                     /* Print the given message if the action is "print". */
531                     LogInfo( ( "\r\n"
532                                "/*-----------------------------------------------------------*/\r\n"
533                                "\r\n"
534                                "%.*s\r\n"
535                                "\r\n"
536                                "/*-----------------------------------------------------------*/\r\n"
537                                "\r\n", ulMessageLength, pcMessage ) );
538                     prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "SUCCEEDED" ) );
539                 }
540                 else
541                 {
542                     LogError( ( "Job document schema is invalid. Missing \"message\" for \"print\" action type." ) );
543                     prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
544                 }
545 
546                 break;
547 
548             case JOB_ACTION_PUBLISH:
549                 LogInfo( ( "Received job contains \"publish\" action." ) );
550                 char * pcTopic = NULL;
551                 size_t ulTopicLength = 0U;
552 
553                 xJsonStatus = JSON_Search( pcJobDocument,
554                                            uxJobDocumentLength,
555                                            jobsexampleQUERY_KEY_FOR_TOPIC,
556                                            jobsexampleQUERY_KEY_FOR_TOPIC_LENGTH,
557                                            &pcTopic,
558                                            &ulTopicLength );
559 
560                 /* Search for "topic" key in the Jobs document.*/
561                 if( xJsonStatus != JSONSuccess )
562                 {
563                     LogError( ( "Job document schema is invalid. Missing \"topic\" key for \"publish\" action type." ) );
564                     prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
565                 }
566                 else
567                 {
568                     xJsonStatus = JSON_Search( pcJobDocument,
569                                                uxJobDocumentLength,
570                                                jobsexampleQUERY_KEY_FOR_MESSAGE,
571                                                jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH,
572                                                &pcMessage,
573                                                &ulMessageLength );
574 
575                     /* Search for "message" key in Jobs document.*/
576                     if( xJsonStatus == JSONSuccess )
577                     {
578                         /* Publish to the parsed MQTT topic with the message obtained from
579                          * the Jobs document.*/
580                         if( xPublishToTopic( &xMqttContext,
581                                              pcTopic,
582                                              ulTopicLength,
583                                              pcMessage,
584                                              ulMessageLength ) == pdFALSE )
585                         {
586                             /* Set global flag to terminate demo as PUBLISH operation to execute job failed. */
587                             xDemoEncounteredError = pdTRUE;
588 
589                             LogError( ( "Failed to execute job with \"publish\" action: Failed to publish to topic. "
590                                         "JobID=%.*s, Topic=%.*s",
591                                         usJobIdLength, pcJobId, ulTopicLength, pcTopic ) );
592                         }
593 
594                         prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "SUCCEEDED" ) );
595                     }
596                     else
597                     {
598                         LogError( ( "Job document schema is invalid. Missing \"message\" key for \"publish\" action type." ) );
599                         prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
600                     }
601                 }
602 
603                 break;
604 
605             default:
606                 configPRINTF( ( "Received Job document with unknown action %.*s.",
607                                 uActionLength, pcAction ) );
608                 break;
609         }
610     }
611 }
612 
prvNextJobHandler(MQTTPublishInfo_t * pxPublishInfo)613 static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo )
614 {
615     configASSERT( pxPublishInfo != NULL );
616     configASSERT( ( pxPublishInfo->pPayload != NULL ) && ( pxPublishInfo->payloadLength > 0 ) );
617 
618     /* Check validity of JSON message response from server.*/
619     if( JSON_Validate( pxPublishInfo->pPayload, pxPublishInfo->payloadLength ) != JSONSuccess )
620     {
621         LogError( ( "Received invalid JSON payload from AWS IoT Jobs service" ) );
622     }
623     else
624     {
625         char * pcJobId = NULL;
626         size_t ulJobIdLength = 0UL;
627 
628         /* Parse the Job ID of the next pending job execution from the JSON payload. */
629         if( JSON_Search( ( char * ) pxPublishInfo->pPayload,
630                          pxPublishInfo->payloadLength,
631                          jobsexampleQUERY_KEY_FOR_JOB_ID,
632                          jobsexampleQUERY_KEY_FOR_JOB_ID_LENGTH,
633                          &pcJobId,
634                          &ulJobIdLength ) != JSONSuccess )
635         {
636             LogWarn( ( "Failed to parse Job ID in message received from AWS IoT Jobs service: "
637                        "IncomingTopic=%.*s, Payload=%.*s",
638                        pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName,
639                        pxPublishInfo->payloadLength, pxPublishInfo->pPayload ) );
640         }
641         else
642         {
643             char * pcJobDocLoc = NULL;
644             size_t ulJobDocLength = 0UL;
645 
646             configASSERT( ulJobIdLength < JOBS_JOBID_MAX_LENGTH );
647             LogInfo( ( "Received a Job from AWS IoT Jobs service: JobId=%.*s",
648                        ulJobIdLength, pcJobId ) );
649 
650             /* Copy the Job ID in the global buffer. This is done so that
651              * the MQTT context's network buffer can be used for sending jobs
652              * status updates to the AWS IoT Jobs service. */
653             memcpy( usJobIdBuffer, pcJobId, ulJobIdLength );
654 
655             /* Search for the jobs document in the payload. */
656             if( JSON_Search( ( char * ) pxPublishInfo->pPayload,
657                              pxPublishInfo->payloadLength,
658                              jobsexampleQUERY_KEY_FOR_JOBS_DOC,
659                              jobsexampleQUERY_KEY_FOR_JOBS_DOC_LENGTH,
660                              &pcJobDocLoc,
661                              &ulJobDocLength ) != JSONSuccess )
662             {
663                 LogWarn( ( "Failed to parse document of next job received from AWS IoT Jobs service: "
664                            "Topic=%.*s, JobID=%.*s",
665                            pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName,
666                            ulJobIdLength, pcJobId ) );
667             }
668             else
669             {
670                 /* Copy the Job document in buffer. This is done so that the MQTT connection buffer can
671                  * be used for sending jobs status updates to the AWS IoT Jobs service. */
672                 memcpy( usJobsDocumentBuffer, pcJobDocLoc, ulJobDocLength );
673 
674                 /* Process the Job document and execute the job. */
675                 prvProcessJobDocument( usJobIdBuffer,
676                                        ( uint16_t ) ulJobIdLength,
677                                        usJobsDocumentBuffer,
678                                        ulJobDocLength );
679             }
680         }
681     }
682 }
683 
684 /*-----------------------------------------------------------*/
685 
686 /* This is the callback function invoked by the MQTT stack when it receives
687  * incoming messages. This function demonstrates how to use the Jobs_MatchTopic
688  * function to determine whether the incoming message is a Jobs message
689  * or not. If it is, it handles the message depending on the message type.
690  */
prvEventCallback(MQTTContext_t * pxMqttContext,MQTTPacketInfo_t * pxPacketInfo,MQTTDeserializedInfo_t * pxDeserializedInfo)691 static void prvEventCallback( MQTTContext_t * pxMqttContext,
692                               MQTTPacketInfo_t * pxPacketInfo,
693                               MQTTDeserializedInfo_t * pxDeserializedInfo )
694 {
695     uint16_t usPacketIdentifier;
696 
697     ( void ) pxMqttContext;
698 
699     configASSERT( pxDeserializedInfo != NULL );
700     configASSERT( pxMqttContext != NULL );
701     configASSERT( pxPacketInfo != NULL );
702 
703     usPacketIdentifier = pxDeserializedInfo->packetIdentifier;
704 
705     /* Handle incoming publish. The lower 4 bits of the publish packet
706      * type is used for the dup, QoS, and retain flags. Hence masking
707      * out the lower bits to check if the packet is publish. */
708     if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
709     {
710         configASSERT( pxDeserializedInfo->pPublishInfo != NULL );
711         JobsTopic_t topicType = JobsMaxTopic;
712         JobsStatus_t xStatus = JobsError;
713 
714         LogDebug( ( "Received an incoming publish message: TopicName=%.*s",
715                     pxDeserializedInfo->pPublishInfo->topicNameLength,
716                     ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) );
717 
718         /* Let the Jobs library tell us whether this is a Jobs message. */
719         xStatus = Jobs_MatchTopic( ( char * ) pxDeserializedInfo->pPublishInfo->pTopicName,
720                                    pxDeserializedInfo->pPublishInfo->topicNameLength,
721                                    democonfigTHING_NAME,
722                                    THING_NAME_LENGTH,
723                                    &topicType,
724                                    NULL,
725                                    NULL );
726 
727         if( xStatus == JobsSuccess )
728         {
729             /* Upon successful return, the messageType has been filled in. */
730             if( ( topicType == JobsDescribeSuccess ) || ( topicType == JobsNextJobChanged ) )
731             {
732                 MQTTPublishInfo_t * pxJobMessagePublishInfo = NULL;
733                 char * pcTopicName = NULL;
734                 char * pcPayload = NULL;
735 
736                 /* Copy message to pass into queue. */
737                 pxJobMessagePublishInfo = ( MQTTPublishInfo_t * ) pvPortMalloc( sizeof( MQTTPublishInfo_t ) );
738                 pcTopicName = ( char * ) pvPortMalloc( pxDeserializedInfo->pPublishInfo->topicNameLength );
739                 pcPayload = ( char * ) pvPortMalloc( pxDeserializedInfo->pPublishInfo->payloadLength );
740 
741                 if( ( pxJobMessagePublishInfo == NULL ) || ( pcTopicName == NULL ) || ( pcPayload == NULL ) )
742                 {
743                     LogError( ( "Malloc failed for copying job publish info." ) );
744 
745                     if( pxJobMessagePublishInfo != NULL )
746                     {
747                         vPortFree( pxJobMessagePublishInfo );
748                     }
749 
750                     if( pcTopicName != NULL )
751                     {
752                         vPortFree( pcTopicName );
753                     }
754 
755                     if( pcPayload != NULL )
756                     {
757                         vPortFree( pcPayload );
758                     }
759                 }
760                 else
761                 {
762                     memcpy( pxJobMessagePublishInfo, pxDeserializedInfo->pPublishInfo, sizeof( MQTTPublishInfo_t ) );
763                     memcpy( pcTopicName, pxDeserializedInfo->pPublishInfo->pTopicName, pxDeserializedInfo->pPublishInfo->topicNameLength );
764                     memcpy( pcPayload, pxDeserializedInfo->pPublishInfo->pPayload, pxDeserializedInfo->pPublishInfo->payloadLength );
765 
766                     pxJobMessagePublishInfo->pTopicName = pcTopicName;
767                     pxJobMessagePublishInfo->pPayload = pcPayload;
768 
769                     if( xQueueSend( xJobMessageQueue, &pxJobMessagePublishInfo, 0 ) == errQUEUE_FULL )
770                     {
771                         LogError( ( "Could not enqueue Jobs message." ) );
772 
773                         vPortFree( pxJobMessagePublishInfo );
774                         vPortFree( pcTopicName );
775                         vPortFree( pcPayload );
776                     }
777                 }
778             }
779             else if( topicType == JobsUpdateSuccess )
780             {
781                 LogInfo( ( "Job update status request has been accepted by AWS Iot Jobs service." ) );
782             }
783             else if( topicType == JobsStartNextFailed )
784             {
785                 LogWarn( ( "Request for next job description rejected: RejectedResponse=%.*s.",
786                            pxDeserializedInfo->pPublishInfo->payloadLength,
787                            ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) );
788             }
789             else if( topicType == JobsUpdateFailed )
790             {
791                 /* Set the global flag to terminate the demo, because the request for updating and executing the job status
792                  * has been rejected by the AWS IoT Jobs service. */
793                 xDemoEncounteredError = pdTRUE;
794 
795                 LogWarn( ( "Request for job update rejected: RejectedResponse=%.*s.",
796                            pxDeserializedInfo->pPublishInfo->payloadLength,
797                            ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) );
798 
799                 LogError( ( "Terminating demo as request to update job status has been rejected by "
800                             "AWS IoT Jobs service..." ) );
801             }
802             else
803             {
804                 LogWarn( ( "Received an unexpected messages from AWS IoT Jobs service: "
805                            "JobsTopicType=%u", topicType ) );
806             }
807         }
808         else if( xStatus == JobsNoMatch )
809         {
810             LogWarn( ( "Incoming message topic does not belong to AWS IoT Jobs!: topic=%.*s",
811                        pxDeserializedInfo->pPublishInfo->topicNameLength,
812                        ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) );
813         }
814         else
815         {
816             LogError( ( "Failed to parse incoming publish job. Topic=%.*s!",
817                         pxDeserializedInfo->pPublishInfo->topicNameLength,
818                         ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) );
819         }
820     }
821     else
822     {
823         vHandleOtherIncomingPacket( pxPacketInfo, usPacketIdentifier );
824     }
825 }
826 
827 /*-----------------------------------------------------------*/
828 
829 /**
830  * @brief Entry point of the Jobs demo.
831  *
832  * This main function demonstrates how to use the Jobs library API library.
833  *
834  * This demo uses helper functions for MQTT operations that have internal
835  * loops to process incoming messages. Those are not the focus of this demo
836  * and therefore, are placed in a separate file mqtt_demo_helpers.c.
837  *
838  * This function also shows that the communication with the AWS IoT Jobs services does
839  * not require explicit subscriptions to the response MQTT topics for request commands that
840  * sent to the MQTT APIs (like DescribeJobExecution API) of the service. The service
841  * will send messages on the response topics for the request commands on the same
842  * MQTT connection irrespective of whether the client subscribes to the response topics.
843  * Therefore, this demo processes incoming messages from response topics of DescribeJobExecution
844  * and UpdateJobExecution APIs without explicitly subscribing to the topics.
845  */
prvJobsDemoTask(void * pvParameters)846 void prvJobsDemoTask( void * pvParameters )
847 {
848     BaseType_t xDemoStatus = pdPASS;
849     UBaseType_t uxDemoRunCount = 0UL;
850     BaseType_t retryDemoLoop = pdFALSE;
851 
852     /* Remove compiler warnings about unused parameters. */
853     ( void ) pvParameters;
854 
855     /* Set the pParams member of the network context with desired transport. */
856     xNetworkContext.pParams = &xTlsTransportParams;
857 
858     /* Initialize Jobs message queue. */
859     xJobMessageQueue = xQueueCreate( JOBS_MESSAGE_QUEUE_LEN, sizeof( MQTTPublishInfo_t * ) );
860     configASSERT( xJobMessageQueue != NULL );
861 
862     /* This demo runs a single loop unless there are failures in the demo execution.
863      * In case of failures in the demo execution, demo loop will be retried for up to
864      * JOBS_MAX_DEMO_LOOP_COUNT times. */
865     do
866     {
867         LogInfo( ( "---------STARTING DEMO---------\r\n" ) );
868 
869         if( xPlatformIsNetworkUp() == pdFALSE )
870         {
871             LogInfo( ( "Waiting for the network link up event..." ) );
872 
873             while( xPlatformIsNetworkUp() == pdFALSE )
874             {
875                 vTaskDelay( pdMS_TO_TICKS( 1000U ) );
876             }
877         }
878 
879         /* Establish an MQTT connection with AWS IoT over a mutually authenticated TLS session. */
880         xDemoStatus = xEstablishMqttSession( &xMqttContext,
881                                              &xNetworkContext,
882                                              &xBuffer,
883                                              prvEventCallback );
884 
885         if( xDemoStatus == pdFAIL )
886         {
887             /* Log error to indicate connection failure. */
888             LogError( ( "Failed to connect to AWS IoT broker." ) );
889         }
890         else
891         {
892             /* Print out a short user guide to the console. The default logging
893              * limit of 255 characters can be changed in demo_logging.c, but breaking
894              * up the only instance of a 1000+ character string is more practical. */
895             LogInfo( ( "\r\n"
896                        "/*-----------------------------------------------------------*/\r\n"
897                        "\r\n"
898                        "The Jobs demo is now ready to accept Jobs.\r\n"
899                        "Jobs may be created using the AWS IoT console or AWS CLI.\r\n"
900                        "See the following link for more information.\r\n" ) );
901             LogInfo( ( "\r"
902                        "https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html\r\n"
903                        "\r\n"
904                        "This demo expects Job documents to have an \"action\" JSON key.\r\n"
905                        "The following actions are currently supported:\r\n" ) );
906             LogInfo( ( "\r"
907                        " - print          \r\n"
908                        "   Logs a message to the local console. The Job document must also contain a \"message\".\r\n"
909                        "   For example: { \"action\": \"print\", \"message\": \"Hello world!\"} will cause\r\n"
910                        "   \"Hello world!\" to be printed on the console.\r\n" ) );
911             LogInfo( ( "\r"
912                        " - publish        \r\n"
913                        "   Publishes a message to an MQTT topic. The Job document must also contain a \"message\" and \"topic\".\r\n" ) );
914             LogInfo( ( "\r"
915                        "   For example: { \"action\": \"publish\", \"topic\": \"demo/jobs\", \"message\": \"Hello world!\"} will cause\r\n"
916                        "   \"Hello world!\" to be published to the topic \"demo/jobs\".\r\n" ) );
917             LogInfo( ( "\r"
918                        " - exit           \r\n"
919                        "   Exits the demo program. This program will run until { \"action\": \"exit\" } is received.\r\n"
920                        "\r\n"
921                        "/*-----------------------------------------------------------*/\r\n" ) );
922 
923             /* Subscribe to the NextJobExecutionChanged API topic to receive notifications about the next pending
924              * job in the queue for the Thing resource used by this demo. */
925             if( xSubscribeToTopic( &xMqttContext,
926                                    NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
927                                    sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
928             {
929                 xDemoStatus = pdFAIL;
930                 LogError( ( "Failed to subscribe to NextJobExecutionChanged API of AWS IoT Jobs service: Topic=%s",
931                             NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
932             }
933         }
934 
935         if( xDemoStatus == pdPASS )
936         {
937             /* Publish to AWS IoT Jobs on the DescribeJobExecution API to request the next pending job.
938              *
939              * Note: It is not required to make MQTT subscriptions to the response topics of the
940              * DescribeJobExecution API because the AWS IoT Jobs service sends responses for
941              * the PUBLISH commands on the same MQTT connection irrespective of whether the client has subscribed
942              * to the response topics or not.
943              * This demo processes incoming messages from the response topics of the API in the prvEventCallback()
944              * handler that is supplied to the coreMQTT library. */
945             if( xPublishToTopic( &xMqttContext,
946                                  DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ),
947                                  sizeof( DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1,
948                                  NULL,
949                                  0 ) != pdPASS )
950             {
951                 xDemoStatus = pdFAIL;
952                 LogError( ( "Failed to publish to DescribeJobExecution API of AWS IoT Jobs service: "
953                             "Topic=%s", DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) );
954             }
955         }
956 
957         /* Keep on running the demo until we receive a job for the "exit" action to exit the demo. */
958         while( ( xExitActionJobReceived == pdFALSE ) &&
959                ( xDemoEncounteredError == pdFALSE ) &&
960                ( xDemoStatus == pdPASS ) )
961         {
962             MQTTPublishInfo_t * pxJobMessagePublishInfo;
963             MQTTStatus_t xMqttStatus = MQTTSuccess;
964 
965             /* Check if we have notification for the next pending job in the queue from the
966              * NextJobExecutionChanged API of the AWS IoT Jobs service. */
967             xMqttStatus = MQTT_ProcessLoop( &xMqttContext );
968 
969             /* Receive any incoming Jobs message. */
970             if( xQueueReceive( xJobMessageQueue, &pxJobMessagePublishInfo, 0 ) == pdTRUE )
971             {
972                 /* Handler function to process Jobs message payload. */
973                 prvNextJobHandler( pxJobMessagePublishInfo );
974                 vPortFree( ( void * ) ( pxJobMessagePublishInfo->pTopicName ) );
975                 vPortFree( ( void * ) ( pxJobMessagePublishInfo->pPayload ) );
976                 vPortFree( pxJobMessagePublishInfo );
977             }
978 
979             if( xMqttStatus != MQTTSuccess )
980             {
981                 xDemoStatus = pdFAIL;
982                 LogError( ( "Failed to receive notification about next pending job: "
983                             "MQTT_ProcessLoop failed" ) );
984             }
985         }
986 
987         /* Increment the demo run count. */
988         uxDemoRunCount++;
989 
990         /* Retry demo loop only if there is a failure before completing
991          * the processing of any pending jobs. Any failure in MQTT unsubscribe
992          * or disconnect is considered a failure in demo execution and retry
993          * will not be attempted since a retry without any pending jobs will
994          * make this demo indefinitely wait. */
995         if( ( xDemoStatus == pdFAIL ) || ( xDemoEncounteredError == pdTRUE ) )
996         {
997             if( uxDemoRunCount < JOBS_MAX_DEMO_LOOP_COUNT )
998             {
999                 LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
1000                 retryDemoLoop = pdTRUE;
1001             }
1002             else
1003             {
1004                 LogError( ( "All %d demo iterations failed.", JOBS_MAX_DEMO_LOOP_COUNT ) );
1005                 retryDemoLoop = pdFALSE;
1006             }
1007         }
1008         else
1009         {
1010             /* Reset the flag for demo retry. */
1011             retryDemoLoop = pdFALSE;
1012         }
1013 
1014         /* Unsubscribe from the NextJobExecutionChanged API topic. */
1015         if( xUnsubscribeFromTopic( &xMqttContext,
1016                                    NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
1017                                    sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
1018         {
1019             xDemoStatus = pdFAIL;
1020             LogError( ( "Failed to subscribe unsubscribe from the NextJobExecutionChanged API of AWS IoT Jobs service: "
1021                         "Topic=%s", NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
1022         }
1023 
1024         /* Disconnect the MQTT and network connections with AWS IoT. */
1025         if( xDisconnectMqttSession( &xMqttContext, &xNetworkContext ) != pdPASS )
1026         {
1027             xDemoStatus = pdFAIL;
1028             LogError( ( "Disconnection from AWS IoT failed..." ) );
1029         }
1030 
1031         /* Add a delay if a retry is required. */
1032         if( retryDemoLoop == pdTRUE )
1033         {
1034             /* Clear the flag that indicates that indicates the demo error
1035              * before attempting a retry. */
1036             xDemoEncounteredError = pdFALSE;
1037 
1038             LogInfo( ( "A short delay before the next demo iteration." ) );
1039             vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
1040         }
1041     } while( retryDemoLoop == pdTRUE );
1042 
1043     if( ( xDemoEncounteredError == pdFALSE ) && ( xDemoStatus == pdPASS ) )
1044     {
1045         LogInfo( ( "Demo completed successfully." ) );
1046     }
1047 
1048     LogInfo( ( "-------DEMO FINISHED-------\r\n" ) );
1049 
1050     /* Delete this demo task. */
1051     LogInfo( ( "Deleting Jobs Demo task." ) );
1052     vTaskDelete( NULL );
1053 }
1054