1 /*
2 * FreeRTOS V202212.00
3 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
6 * this software and associated documentation files (the "Software"), to deal in
7 * the Software without restriction, including without limitation the rights to
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9 * the Software, and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in all
13 * copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * https://www.FreeRTOS.org
23 * https://github.com/FreeRTOS
24 *
25 */
26
27 /**
28 * @file JobsDemoExample.c
29 *
30 * @brief Demo for showing use of the Jobs library API. This demo uses the Jobs library
31 * along with the coreMQTT, mbedTLS and FreeRTOS+TCP libraries to communicate with the AWS IoT Jobs service.
32 * Please refer to AWS documentation for more information about AWS IoT Jobs.
33 * https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html
34 *
35 * The Jobs library API provides macros and helper functions for assembling MQTT topics strings,
36 * and for determining whether an incoming MQTT message is related to the AWS IoT Jobs service.
37 * The Jobs library does not depend on an MQTT library, and therefore, the code for MQTT operations
38 * is placed in another file (mqtt_demo_helpers.c) for improving readability of the demo code about using
39 * the Jobs library.
40 *
41 * @note This demo requires setup of an AWS account, provisioning of a Thing resource on the AWS IoT account,
42 * and the creation of Jobs for the Thing resource. Please refer to AWS CLI documentation for more information
43 * in creating a job document.
44 * https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html
45 *
46 * This demo connects to the AWS IoT broker and calls the MQTT APIs of the AWS IoT Jobs service to receive
47 * jobs queued (as JSON documents) for the Thing resource (associated with this demo application) on the cloud,
48 * then executes the jobs and updates the status of the jobs back to the cloud.
49 * The demo expects job documents to have an "action" JSON key. Actions can
50 * be one of "print", "publish", or "exit".
51 * A "print" job logs a message to the local console, and must contain a "message",
52 * e.g. { "action": "print", "message": "Hello World!" }.
53 * A "publish" job publishes a message to an MQTT Topic. The job document must
54 * contain a "message" and "topic" to publish to, e.g.
55 * { "action": "publish", "topic": "demo/jobs", "message": "Hello World!" }.
56 * An "exit" job exits the demo. Sending { "action": "exit" } will end the demo program.
57 */
58
59 /* Standard includes. */
60 #include <assert.h>
61 #include <stdlib.h>
62 #include <string.h>
63 #include <stdint.h>
64
65 /* Demo Specific config file. */
66 #include "demo_config.h"
67
68 /* Kernel includes. */
69 #include "FreeRTOS.h"
70 #include "task.h"
71 #include "queue.h"
72
73 /* Jobs library header. */
74 #include "jobs.h"
75
76 /* JSON library includes. */
77 #include "core_json.h"
78
79 /* Include common MQTT demo helpers. */
80 #include "mqtt_demo_helpers.h"
81
82 /*------------- Demo configurations -------------------------*/
83
84 #ifndef democonfigTHING_NAME
85 #error "Please define the Thing resource name, democonfigTHING_NAME, in demo_config.h"
86 #endif
87
88 /**
89 * @brief The length of #democonfigTHING_NAME.
90 */
91 #define THING_NAME_LENGTH ( ( uint16_t ) ( sizeof( democonfigTHING_NAME ) - 1 ) )
92
93 /*-----------------------------------------------------------*/
94
95 /**
96 * @brief The JSON key of the execution object.
97 *
98 * Job documents received from the AWS IoT Jobs service are in JSON format.
99 * All such JSON documents will contain this key, whose value represents the unique
100 * identifier of a Job.
101 */
102 #define jobsexampleEXECUTION_KEY "execution"
103
104 /**
105 * @brief The length of #jobsexampleEXECUTION_KEY.
106 */
107 #define jobsexampleEXECUTION_KEY_LENGTH ( sizeof( jobsexampleEXECUTION_KEY ) - 1 )
108
109 /**
110 * @brief The query key to use for searching the Job ID key in message payload
111 * from AWS IoT Jobs service.
112 *
113 * Job documents received from the AWS IoT Jobs service are in JSON format.
114 * All such JSON documents will contain this key, whose value represents the unique
115 * identifier of a Job.
116 */
117 #define jobsexampleQUERY_KEY_FOR_JOB_ID jobsexampleEXECUTION_KEY ".jobId"
118
119 /**
120 * @brief The length of #jobsexampleQUERY_KEY_FOR_JOB_ID.
121 */
122 #define jobsexampleQUERY_KEY_FOR_JOB_ID_LENGTH ( sizeof( jobsexampleQUERY_KEY_FOR_JOB_ID ) - 1 )
123
124 /**
125 * @brief The query key to use for searching the Jobs document ID key in message payload
126 * from AWS IoT Jobs service.
127 *
128 * Job documents received from the AWS IoT Jobs service are in JSON format.
129 * All such JSON documents will contain this key, whose value represents the unique
130 * identifier of a Job.
131 */
132 #define jobsexampleQUERY_KEY_FOR_JOBS_DOC jobsexampleEXECUTION_KEY ".jobDocument"
133
134 /**
135 * @brief The length of #jobsexampleQUERY_KEY_FOR_JOBS_DOC.
136 */
137 #define jobsexampleQUERY_KEY_FOR_JOBS_DOC_LENGTH ( sizeof( jobsexampleQUERY_KEY_FOR_JOBS_DOC ) - 1 )
138
139 /**
140 * @brief The query key to use for searching the Action key in Jobs document
141 * from AWS IoT Jobs service.
142 *
143 * This demo program expects this key to be in the Job document. It is a key
144 * specific to this demo.
145 */
146 #define jobsexampleQUERY_KEY_FOR_ACTION "action"
147
148 /**
149 * @brief The length of #jobsexampleQUERY_KEY_FOR_ACTION.
150 */
151 #define jobsexampleQUERY_KEY_FOR_ACTION_LENGTH ( sizeof( jobsexampleQUERY_KEY_FOR_ACTION ) - 1 )
152
153 /**
154 * @brief The query key to use for searching the Message key in Jobs document
155 * from AWS IoT Jobs service.
156 *
157 * This demo program expects this key to be in the Job document if the "action"
158 * is either "publish" or "print". It represents the message that should be
159 * published or printed, respectively.
160 */
161 #define jobsexampleQUERY_KEY_FOR_MESSAGE "message"
162
163 /**
164 * @brief The length of #jobsexampleQUERY_KEY_FOR_MESSAGE.
165 */
166 #define jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH ( sizeof( jobsexampleQUERY_KEY_FOR_MESSAGE ) - 1 )
167
168 /**
169 * @brief The query key to use for searching the topic key in Jobs document
170 * from AWS IoT Jobs service.
171 *
172 * This demo program expects this key to be in the Job document if the "action"
173 * is "publish". It represents the MQTT topic on which the message should be
174 * published.
175 */
176 #define jobsexampleQUERY_KEY_FOR_TOPIC "topic"
177
178 /**
179 * @brief The length of #jobsexampleQUERY_KEY_FOR_TOPIC.
180 */
181 #define jobsexampleQUERY_KEY_FOR_TOPIC_LENGTH ( sizeof( jobsexampleQUERY_KEY_FOR_TOPIC ) - 1 )
182
183 /**
184 * @brief Utility macro to generate the PUBLISH topic string to the
185 * DescribeJobExecution API of AWS IoT Jobs service for requesting
186 * the next pending job information.
187 *
188 * @param[in] thingName The name of the Thing resource to query for the
189 * next pending job.
190 */
191 #define DESCRIBE_NEXT_JOB_TOPIC( thingName ) \
192 ( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_JOBID_NEXT "/" JOBS_API_GETPENDING )
193
194 /**
195 * @brief Utility macro to generate the subscription topic string for the
196 * NextJobExecutionChanged API of AWS IoT Jobs service that is required
197 * for getting notification about changes in the next pending job in the queue.
198 *
199 * @param[in] thingName The name of the Thing resource to query for the
200 * next pending Job.
201 */
202 #define NEXT_JOB_EXECUTION_CHANGED_TOPIC( thingName ) \
203 ( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_NEXTJOBCHANGED )
204
205 /**
206 * @brief Format a JSON status message.
207 *
208 * @param[in] x one of "IN_PROGRESS", "SUCCEEDED", or "FAILED"
209 */
210 #define MAKE_STATUS_REPORT( x ) "{\"status\":\"" x "\"}"
211
212 /**
213 * @brief The maximum number of times to run the loop in this demo.
214 *
215 * @note The demo loop is attempted to re-run only if it fails in an iteration.
216 * Once the demo loop succeeds in an iteration, the demo exits successfully.
217 */
218 #ifndef JOBS_MAX_DEMO_LOOP_COUNT
219 #define JOBS_MAX_DEMO_LOOP_COUNT ( 3 )
220 #endif
221
222 /**
223 * @brief Time in ticks to wait between retries of the demo loop if
224 * demo loop fails.
225 */
226 #define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
227
228 /**
229 * @brief Length of the queue to pass Jobs messages to the job handling task.
230 */
231 #define JOBS_MESSAGE_QUEUE_LEN ( 10U )
232
233 /*-----------------------------------------------------------*/
234
235 /**
236 * @brief Currently supported actions that a job document can specify.
237 */
238 typedef enum JobActionType
239 {
240 JOB_ACTION_PRINT, /**< Print a message. */
241 JOB_ACTION_PUBLISH, /**< Publish a message to an MQTT topic. */
242 JOB_ACTION_EXIT, /**< Exit the demo. */
243 JOB_ACTION_UNKNOWN /**< Unknown action. */
244 } JobActionType;
245
246 /*-----------------------------------------------------------*/
247
248 /**
249 * @brief Each compilation unit that consumes the NetworkContext must define it.
250 * It should contain a single pointer to the type of your desired transport.
251 * When using multiple transports in the same compilation unit, define this pointer as void *.
252 *
253 * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport.
254 */
255 struct NetworkContext
256 {
257 TlsTransportParams_t * pParams;
258 };
259
260 /*-----------------------------------------------------------*/
261
262 /**
263 * @brief The MQTT context used for MQTT operation.
264 */
265 static MQTTContext_t xMqttContext;
266
267 /**
268 * @brief The network context used for mbedTLS operation.
269 */
270 static NetworkContext_t xNetworkContext;
271
272 /**
273 * @brief The parameters for the network context using mbedTLS operation.
274 */
275 static TlsTransportParams_t xTlsTransportParams;
276
277 /**
278 * @brief Static buffer used to hold MQTT messages being sent and received.
279 */
280 static uint8_t usMqttConnectionBuffer[ democonfigNETWORK_BUFFER_SIZE ];
281
282 /**
283 * @brief Static buffer used to hold the job ID of the single job that
284 * is executed at a time in the demo. This buffer allows re-use of the MQTT
285 * connection context for sending status updates of a job while it is being
286 * processed.
287 */
288 static uint8_t usJobIdBuffer[ democonfigNETWORK_BUFFER_SIZE ];
289
290 /**
291 * @brief Static buffer used to hold the job document of the single job that
292 * is executed at a time in the demo. This buffer allows re-use of the MQTT
293 * connection context for sending status updates of a job while it is being processed.
294 */
295 static uint8_t usJobsDocumentBuffer[ democonfigNETWORK_BUFFER_SIZE ];
296
297 /**
298 * @brief Static buffer used to hold MQTT messages being sent and received.
299 */
300 static MQTTFixedBuffer_t xBuffer =
301 {
302 .pBuffer = usMqttConnectionBuffer,
303 .size = democonfigNETWORK_BUFFER_SIZE
304 };
305
306 /**
307 * @brief A global flag which represents whether a job for the "Exit" action
308 * has been received from AWS IoT Jobs service.
309 */
310 static BaseType_t xExitActionJobReceived = pdFALSE;
311
312 /**
313 * @brief A global flag which represents whether an error was encountered while
314 * executing the demo.
315 *
316 * @note When this flag is set, the demo terminates execution.
317 */
318 static BaseType_t xDemoEncounteredError = pdFALSE;
319
320 /**
321 * @brief Queue used to pass incoming Jobs messages to a task to handle them.
322 */
323 static QueueHandle_t xJobMessageQueue;
324
325 /*-----------------------------------------------------------*/
326
327 /**
328 * @brief Converts a string in a job document to a #JobActionType
329 * value.
330 *
331 * @param[in] pcAction The job action as a string.
332 * @param[in] xActionLength The length of @p pcAction.
333 *
334 * @return A #JobActionType equivalent to the given string.
335 */
336 static JobActionType prvGetAction( const char * pcAction,
337 size_t xActionLength );
338
339 /**
340 * @brief This example uses the MQTT library of the AWS IoT Device SDK for
341 * Embedded C. This is the prototype of the callback function defined by
342 * that library. It will be invoked whenever the MQTT library receives an
343 * incoming message.
344 *
345 * @param[in] pxMqttContext MQTT context pointer.
346 * @param[in] pxPacketInfo Packet Info pointer for the incoming packet.
347 * @param[in] pxDeserializedInfo Deserialized information from the incoming packet.
348 */
349 static void prvEventCallback( MQTTContext_t * pxMqttContext,
350 MQTTPacketInfo_t * pxPacketInfo,
351 MQTTDeserializedInfo_t * pxDeserializedInfo );
352
353 /**
354 * @brief Process payload from NextJobExecutionChanged and DescribeJobExecution
355 * API MQTT topics of AWS IoT Jobs service.
356 *
357 * This handler parses the received payload about the next pending job, identifies
358 * the action requested in the job document, and executes the action.
359 *
360 * @param[in] pPublishInfo Deserialized publish info pointer for the incoming
361 * packet.
362 */
363 static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo );
364
365 /**
366 * @brief Sends an update for a job to the UpdateJobExecution API of the AWS IoT Jobs service.
367 *
368 * @param[in] pcJobId The job ID whose status has to be updated.
369 * @param[in] usJobIdLength The length of the job ID string.
370 * @param[in] pcJobStatusReport The JSON formatted report to send to the AWS IoT Jobs service
371 * to update the status of @p pcJobId.
372 */
373 static void prvSendUpdateForJob( char * pcJobId,
374 uint16_t usJobIdLength,
375 const char * pcJobStatusReport );
376
377 /**
378 * @brief Executes a job received from AWS IoT Jobs service and sends an update back to the service.
379 * It parses the received job document, executes the job depending on the job "Action" type, and
380 * sends an update to AWS for the Job.
381 *
382 * @param[in] pcJobId The ID of the job to execute.
383 * @param[in] usJobIdLength The length of the job ID string.
384 * @param[in] pcJobDocument The JSON document associated with the @a pcJobID job
385 * that is to be processed.
386 * @param[in] usDocumentLength The length of the job document.
387 */
388 static void prvProcessJobDocument( char * pcJobId,
389 uint16_t usJobIdLength,
390 char * pcJobDocument,
391 size_t uxJobDocumentLength );
392
393 /**
394 * @brief The task used to demonstrate the Jobs library API.
395 *
396 * @param[in] pvParameters Parameters as passed at the time of task creation.
397 * Not used in this example.
398 */
399 void prvJobsDemoTask( void * pvParameters );
400
401 /*-----------------------------------------------------------*/
402
403 extern BaseType_t xPlatformIsNetworkUp( void );
404
405 /*-----------------------------------------------------------*/
406
prvGetAction(const char * pcAction,size_t xActionLength)407 static JobActionType prvGetAction( const char * pcAction,
408 size_t xActionLength )
409 {
410 JobActionType xAction = JOB_ACTION_UNKNOWN;
411
412 configASSERT( pcAction != NULL );
413
414 if( strncmp( pcAction, "print", xActionLength ) == 0 )
415 {
416 xAction = JOB_ACTION_PRINT;
417 }
418 else if( strncmp( pcAction, "publish", xActionLength ) == 0 )
419 {
420 xAction = JOB_ACTION_PUBLISH;
421 }
422 else if( strncmp( pcAction, "exit", xActionLength ) == 0 )
423 {
424 xAction = JOB_ACTION_EXIT;
425 }
426
427 return xAction;
428 }
429
prvSendUpdateForJob(char * pcJobId,uint16_t usJobIdLength,const char * pcJobStatusReport)430 static void prvSendUpdateForJob( char * pcJobId,
431 uint16_t usJobIdLength,
432 const char * pcJobStatusReport )
433 {
434 char pUpdateJobTopic[ JOBS_API_MAX_LENGTH( THING_NAME_LENGTH ) ];
435 size_t ulTopicLength = 0;
436 JobsStatus_t xStatus = JobsSuccess;
437
438 configASSERT( ( pcJobId != NULL ) && ( usJobIdLength > 0 ) );
439 configASSERT( pcJobStatusReport != NULL );
440
441 /* Generate the PUBLISH topic string for the UpdateJobExecution API of AWS IoT Jobs service. */
442 xStatus = Jobs_Update( pUpdateJobTopic,
443 sizeof( pUpdateJobTopic ),
444 democonfigTHING_NAME,
445 THING_NAME_LENGTH,
446 pcJobId,
447 usJobIdLength,
448 &ulTopicLength );
449
450 if( xStatus == JobsSuccess )
451 {
452 if( xPublishToTopic( &xMqttContext,
453 pUpdateJobTopic,
454 ulTopicLength,
455 pcJobStatusReport,
456 strlen( pcJobStatusReport ) ) == pdFALSE )
457 {
458 /* Set global flag to terminate demo as PUBLISH operation to update job status failed. */
459 xDemoEncounteredError = pdTRUE;
460
461 LogError( ( "Failed to update the status of job: JobID=%.*s, NewStatePayload=%s",
462 usJobIdLength, pcJobId, pcJobStatusReport ) );
463 }
464 }
465 else
466 {
467 /* Set global flag to terminate demo as topic generation for UpdateJobExecution API failed. */
468 xDemoEncounteredError = pdTRUE;
469
470 LogError( ( "Failed to generate Publish topic string for sending job update: "
471 "JobID=%.*s, NewStatePayload=%s",
472 usJobIdLength, pcJobId, pcJobStatusReport ) );
473 }
474 }
475
prvProcessJobDocument(char * pcJobId,uint16_t usJobIdLength,char * pcJobDocument,size_t uxJobDocumentLength)476 static void prvProcessJobDocument( char * pcJobId,
477 uint16_t usJobIdLength,
478 char * pcJobDocument,
479 size_t uxJobDocumentLength )
480 {
481 char * pcAction = NULL;
482 size_t uActionLength = 0U;
483 JSONStatus_t xJsonStatus = JSONSuccess;
484
485 configASSERT( pcJobId != NULL );
486 configASSERT( usJobIdLength > 0 );
487 configASSERT( pcJobDocument != NULL );
488 configASSERT( uxJobDocumentLength > 0 );
489
490 xJsonStatus = JSON_Search( pcJobDocument,
491 uxJobDocumentLength,
492 jobsexampleQUERY_KEY_FOR_ACTION,
493 jobsexampleQUERY_KEY_FOR_ACTION_LENGTH,
494 &pcAction,
495 &uActionLength );
496
497 if( xJsonStatus != JSONSuccess )
498 {
499 LogError( ( "Job document schema is invalid. Missing expected \"action\" key in document." ) );
500 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
501 }
502 else
503 {
504 JobActionType xActionType = JOB_ACTION_UNKNOWN;
505 char * pcMessage = NULL;
506 size_t ulMessageLength = 0U;
507
508 xActionType = prvGetAction( pcAction, uActionLength );
509
510 switch( xActionType )
511 {
512 case JOB_ACTION_EXIT:
513 LogInfo( ( "Received job contains \"exit\" action. Updating state of demo." ) );
514 xExitActionJobReceived = pdTRUE;
515 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "SUCCEEDED" ) );
516 break;
517
518 case JOB_ACTION_PRINT:
519 LogInfo( ( "Received job contains \"print\" action." ) );
520
521 xJsonStatus = JSON_Search( pcJobDocument,
522 uxJobDocumentLength,
523 jobsexampleQUERY_KEY_FOR_MESSAGE,
524 jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH,
525 &pcMessage,
526 &ulMessageLength );
527
528 if( xJsonStatus == JSONSuccess )
529 {
530 /* Print the given message if the action is "print". */
531 LogInfo( ( "\r\n"
532 "/*-----------------------------------------------------------*/\r\n"
533 "\r\n"
534 "%.*s\r\n"
535 "\r\n"
536 "/*-----------------------------------------------------------*/\r\n"
537 "\r\n", ulMessageLength, pcMessage ) );
538 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "SUCCEEDED" ) );
539 }
540 else
541 {
542 LogError( ( "Job document schema is invalid. Missing \"message\" for \"print\" action type." ) );
543 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
544 }
545
546 break;
547
548 case JOB_ACTION_PUBLISH:
549 LogInfo( ( "Received job contains \"publish\" action." ) );
550 char * pcTopic = NULL;
551 size_t ulTopicLength = 0U;
552
553 xJsonStatus = JSON_Search( pcJobDocument,
554 uxJobDocumentLength,
555 jobsexampleQUERY_KEY_FOR_TOPIC,
556 jobsexampleQUERY_KEY_FOR_TOPIC_LENGTH,
557 &pcTopic,
558 &ulTopicLength );
559
560 /* Search for "topic" key in the Jobs document.*/
561 if( xJsonStatus != JSONSuccess )
562 {
563 LogError( ( "Job document schema is invalid. Missing \"topic\" key for \"publish\" action type." ) );
564 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
565 }
566 else
567 {
568 xJsonStatus = JSON_Search( pcJobDocument,
569 uxJobDocumentLength,
570 jobsexampleQUERY_KEY_FOR_MESSAGE,
571 jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH,
572 &pcMessage,
573 &ulMessageLength );
574
575 /* Search for "message" key in Jobs document.*/
576 if( xJsonStatus == JSONSuccess )
577 {
578 /* Publish to the parsed MQTT topic with the message obtained from
579 * the Jobs document.*/
580 if( xPublishToTopic( &xMqttContext,
581 pcTopic,
582 ulTopicLength,
583 pcMessage,
584 ulMessageLength ) == pdFALSE )
585 {
586 /* Set global flag to terminate demo as PUBLISH operation to execute job failed. */
587 xDemoEncounteredError = pdTRUE;
588
589 LogError( ( "Failed to execute job with \"publish\" action: Failed to publish to topic. "
590 "JobID=%.*s, Topic=%.*s",
591 usJobIdLength, pcJobId, ulTopicLength, pcTopic ) );
592 }
593
594 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "SUCCEEDED" ) );
595 }
596 else
597 {
598 LogError( ( "Job document schema is invalid. Missing \"message\" key for \"publish\" action type." ) );
599 prvSendUpdateForJob( pcJobId, usJobIdLength, MAKE_STATUS_REPORT( "FAILED" ) );
600 }
601 }
602
603 break;
604
605 default:
606 configPRINTF( ( "Received Job document with unknown action %.*s.",
607 uActionLength, pcAction ) );
608 break;
609 }
610 }
611 }
612
prvNextJobHandler(MQTTPublishInfo_t * pxPublishInfo)613 static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo )
614 {
615 configASSERT( pxPublishInfo != NULL );
616 configASSERT( ( pxPublishInfo->pPayload != NULL ) && ( pxPublishInfo->payloadLength > 0 ) );
617
618 /* Check validity of JSON message response from server.*/
619 if( JSON_Validate( pxPublishInfo->pPayload, pxPublishInfo->payloadLength ) != JSONSuccess )
620 {
621 LogError( ( "Received invalid JSON payload from AWS IoT Jobs service" ) );
622 }
623 else
624 {
625 char * pcJobId = NULL;
626 size_t ulJobIdLength = 0UL;
627
628 /* Parse the Job ID of the next pending job execution from the JSON payload. */
629 if( JSON_Search( ( char * ) pxPublishInfo->pPayload,
630 pxPublishInfo->payloadLength,
631 jobsexampleQUERY_KEY_FOR_JOB_ID,
632 jobsexampleQUERY_KEY_FOR_JOB_ID_LENGTH,
633 &pcJobId,
634 &ulJobIdLength ) != JSONSuccess )
635 {
636 LogWarn( ( "Failed to parse Job ID in message received from AWS IoT Jobs service: "
637 "IncomingTopic=%.*s, Payload=%.*s",
638 pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName,
639 pxPublishInfo->payloadLength, pxPublishInfo->pPayload ) );
640 }
641 else
642 {
643 char * pcJobDocLoc = NULL;
644 size_t ulJobDocLength = 0UL;
645
646 configASSERT( ulJobIdLength < JOBS_JOBID_MAX_LENGTH );
647 LogInfo( ( "Received a Job from AWS IoT Jobs service: JobId=%.*s",
648 ulJobIdLength, pcJobId ) );
649
650 /* Copy the Job ID in the global buffer. This is done so that
651 * the MQTT context's network buffer can be used for sending jobs
652 * status updates to the AWS IoT Jobs service. */
653 memcpy( usJobIdBuffer, pcJobId, ulJobIdLength );
654
655 /* Search for the jobs document in the payload. */
656 if( JSON_Search( ( char * ) pxPublishInfo->pPayload,
657 pxPublishInfo->payloadLength,
658 jobsexampleQUERY_KEY_FOR_JOBS_DOC,
659 jobsexampleQUERY_KEY_FOR_JOBS_DOC_LENGTH,
660 &pcJobDocLoc,
661 &ulJobDocLength ) != JSONSuccess )
662 {
663 LogWarn( ( "Failed to parse document of next job received from AWS IoT Jobs service: "
664 "Topic=%.*s, JobID=%.*s",
665 pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName,
666 ulJobIdLength, pcJobId ) );
667 }
668 else
669 {
670 /* Copy the Job document in buffer. This is done so that the MQTT connection buffer can
671 * be used for sending jobs status updates to the AWS IoT Jobs service. */
672 memcpy( usJobsDocumentBuffer, pcJobDocLoc, ulJobDocLength );
673
674 /* Process the Job document and execute the job. */
675 prvProcessJobDocument( usJobIdBuffer,
676 ( uint16_t ) ulJobIdLength,
677 usJobsDocumentBuffer,
678 ulJobDocLength );
679 }
680 }
681 }
682 }
683
684 /*-----------------------------------------------------------*/
685
686 /* This is the callback function invoked by the MQTT stack when it receives
687 * incoming messages. This function demonstrates how to use the Jobs_MatchTopic
688 * function to determine whether the incoming message is a Jobs message
689 * or not. If it is, it handles the message depending on the message type.
690 */
prvEventCallback(MQTTContext_t * pxMqttContext,MQTTPacketInfo_t * pxPacketInfo,MQTTDeserializedInfo_t * pxDeserializedInfo)691 static void prvEventCallback( MQTTContext_t * pxMqttContext,
692 MQTTPacketInfo_t * pxPacketInfo,
693 MQTTDeserializedInfo_t * pxDeserializedInfo )
694 {
695 uint16_t usPacketIdentifier;
696
697 ( void ) pxMqttContext;
698
699 configASSERT( pxDeserializedInfo != NULL );
700 configASSERT( pxMqttContext != NULL );
701 configASSERT( pxPacketInfo != NULL );
702
703 usPacketIdentifier = pxDeserializedInfo->packetIdentifier;
704
705 /* Handle incoming publish. The lower 4 bits of the publish packet
706 * type is used for the dup, QoS, and retain flags. Hence masking
707 * out the lower bits to check if the packet is publish. */
708 if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
709 {
710 configASSERT( pxDeserializedInfo->pPublishInfo != NULL );
711 JobsTopic_t topicType = JobsMaxTopic;
712 JobsStatus_t xStatus = JobsError;
713
714 LogDebug( ( "Received an incoming publish message: TopicName=%.*s",
715 pxDeserializedInfo->pPublishInfo->topicNameLength,
716 ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) );
717
718 /* Let the Jobs library tell us whether this is a Jobs message. */
719 xStatus = Jobs_MatchTopic( ( char * ) pxDeserializedInfo->pPublishInfo->pTopicName,
720 pxDeserializedInfo->pPublishInfo->topicNameLength,
721 democonfigTHING_NAME,
722 THING_NAME_LENGTH,
723 &topicType,
724 NULL,
725 NULL );
726
727 if( xStatus == JobsSuccess )
728 {
729 /* Upon successful return, the messageType has been filled in. */
730 if( ( topicType == JobsDescribeSuccess ) || ( topicType == JobsNextJobChanged ) )
731 {
732 MQTTPublishInfo_t * pxJobMessagePublishInfo = NULL;
733 char * pcTopicName = NULL;
734 char * pcPayload = NULL;
735
736 /* Copy message to pass into queue. */
737 pxJobMessagePublishInfo = ( MQTTPublishInfo_t * ) pvPortMalloc( sizeof( MQTTPublishInfo_t ) );
738 pcTopicName = ( char * ) pvPortMalloc( pxDeserializedInfo->pPublishInfo->topicNameLength );
739 pcPayload = ( char * ) pvPortMalloc( pxDeserializedInfo->pPublishInfo->payloadLength );
740
741 if( ( pxJobMessagePublishInfo == NULL ) || ( pcTopicName == NULL ) || ( pcPayload == NULL ) )
742 {
743 LogError( ( "Malloc failed for copying job publish info." ) );
744
745 if( pxJobMessagePublishInfo != NULL )
746 {
747 vPortFree( pxJobMessagePublishInfo );
748 }
749
750 if( pcTopicName != NULL )
751 {
752 vPortFree( pcTopicName );
753 }
754
755 if( pcPayload != NULL )
756 {
757 vPortFree( pcPayload );
758 }
759 }
760 else
761 {
762 memcpy( pxJobMessagePublishInfo, pxDeserializedInfo->pPublishInfo, sizeof( MQTTPublishInfo_t ) );
763 memcpy( pcTopicName, pxDeserializedInfo->pPublishInfo->pTopicName, pxDeserializedInfo->pPublishInfo->topicNameLength );
764 memcpy( pcPayload, pxDeserializedInfo->pPublishInfo->pPayload, pxDeserializedInfo->pPublishInfo->payloadLength );
765
766 pxJobMessagePublishInfo->pTopicName = pcTopicName;
767 pxJobMessagePublishInfo->pPayload = pcPayload;
768
769 if( xQueueSend( xJobMessageQueue, &pxJobMessagePublishInfo, 0 ) == errQUEUE_FULL )
770 {
771 LogError( ( "Could not enqueue Jobs message." ) );
772
773 vPortFree( pxJobMessagePublishInfo );
774 vPortFree( pcTopicName );
775 vPortFree( pcPayload );
776 }
777 }
778 }
779 else if( topicType == JobsUpdateSuccess )
780 {
781 LogInfo( ( "Job update status request has been accepted by AWS Iot Jobs service." ) );
782 }
783 else if( topicType == JobsStartNextFailed )
784 {
785 LogWarn( ( "Request for next job description rejected: RejectedResponse=%.*s.",
786 pxDeserializedInfo->pPublishInfo->payloadLength,
787 ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) );
788 }
789 else if( topicType == JobsUpdateFailed )
790 {
791 /* Set the global flag to terminate the demo, because the request for updating and executing the job status
792 * has been rejected by the AWS IoT Jobs service. */
793 xDemoEncounteredError = pdTRUE;
794
795 LogWarn( ( "Request for job update rejected: RejectedResponse=%.*s.",
796 pxDeserializedInfo->pPublishInfo->payloadLength,
797 ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) );
798
799 LogError( ( "Terminating demo as request to update job status has been rejected by "
800 "AWS IoT Jobs service..." ) );
801 }
802 else
803 {
804 LogWarn( ( "Received an unexpected messages from AWS IoT Jobs service: "
805 "JobsTopicType=%u", topicType ) );
806 }
807 }
808 else if( xStatus == JobsNoMatch )
809 {
810 LogWarn( ( "Incoming message topic does not belong to AWS IoT Jobs!: topic=%.*s",
811 pxDeserializedInfo->pPublishInfo->topicNameLength,
812 ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) );
813 }
814 else
815 {
816 LogError( ( "Failed to parse incoming publish job. Topic=%.*s!",
817 pxDeserializedInfo->pPublishInfo->topicNameLength,
818 ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) );
819 }
820 }
821 else
822 {
823 vHandleOtherIncomingPacket( pxPacketInfo, usPacketIdentifier );
824 }
825 }
826
827 /*-----------------------------------------------------------*/
828
829 /**
830 * @brief Entry point of the Jobs demo.
831 *
832 * This main function demonstrates how to use the Jobs library API library.
833 *
834 * This demo uses helper functions for MQTT operations that have internal
835 * loops to process incoming messages. Those are not the focus of this demo
836 * and therefore, are placed in a separate file mqtt_demo_helpers.c.
837 *
838 * This function also shows that the communication with the AWS IoT Jobs services does
839 * not require explicit subscriptions to the response MQTT topics for request commands that
840 * sent to the MQTT APIs (like DescribeJobExecution API) of the service. The service
841 * will send messages on the response topics for the request commands on the same
842 * MQTT connection irrespective of whether the client subscribes to the response topics.
843 * Therefore, this demo processes incoming messages from response topics of DescribeJobExecution
844 * and UpdateJobExecution APIs without explicitly subscribing to the topics.
845 */
prvJobsDemoTask(void * pvParameters)846 void prvJobsDemoTask( void * pvParameters )
847 {
848 BaseType_t xDemoStatus = pdPASS;
849 UBaseType_t uxDemoRunCount = 0UL;
850 BaseType_t retryDemoLoop = pdFALSE;
851
852 /* Remove compiler warnings about unused parameters. */
853 ( void ) pvParameters;
854
855 /* Set the pParams member of the network context with desired transport. */
856 xNetworkContext.pParams = &xTlsTransportParams;
857
858 /* Initialize Jobs message queue. */
859 xJobMessageQueue = xQueueCreate( JOBS_MESSAGE_QUEUE_LEN, sizeof( MQTTPublishInfo_t * ) );
860 configASSERT( xJobMessageQueue != NULL );
861
862 /* This demo runs a single loop unless there are failures in the demo execution.
863 * In case of failures in the demo execution, demo loop will be retried for up to
864 * JOBS_MAX_DEMO_LOOP_COUNT times. */
865 do
866 {
867 LogInfo( ( "---------STARTING DEMO---------\r\n" ) );
868
869 if( xPlatformIsNetworkUp() == pdFALSE )
870 {
871 LogInfo( ( "Waiting for the network link up event..." ) );
872
873 while( xPlatformIsNetworkUp() == pdFALSE )
874 {
875 vTaskDelay( pdMS_TO_TICKS( 1000U ) );
876 }
877 }
878
879 /* Establish an MQTT connection with AWS IoT over a mutually authenticated TLS session. */
880 xDemoStatus = xEstablishMqttSession( &xMqttContext,
881 &xNetworkContext,
882 &xBuffer,
883 prvEventCallback );
884
885 if( xDemoStatus == pdFAIL )
886 {
887 /* Log error to indicate connection failure. */
888 LogError( ( "Failed to connect to AWS IoT broker." ) );
889 }
890 else
891 {
892 /* Print out a short user guide to the console. The default logging
893 * limit of 255 characters can be changed in demo_logging.c, but breaking
894 * up the only instance of a 1000+ character string is more practical. */
895 LogInfo( ( "\r\n"
896 "/*-----------------------------------------------------------*/\r\n"
897 "\r\n"
898 "The Jobs demo is now ready to accept Jobs.\r\n"
899 "Jobs may be created using the AWS IoT console or AWS CLI.\r\n"
900 "See the following link for more information.\r\n" ) );
901 LogInfo( ( "\r"
902 "https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html\r\n"
903 "\r\n"
904 "This demo expects Job documents to have an \"action\" JSON key.\r\n"
905 "The following actions are currently supported:\r\n" ) );
906 LogInfo( ( "\r"
907 " - print \r\n"
908 " Logs a message to the local console. The Job document must also contain a \"message\".\r\n"
909 " For example: { \"action\": \"print\", \"message\": \"Hello world!\"} will cause\r\n"
910 " \"Hello world!\" to be printed on the console.\r\n" ) );
911 LogInfo( ( "\r"
912 " - publish \r\n"
913 " Publishes a message to an MQTT topic. The Job document must also contain a \"message\" and \"topic\".\r\n" ) );
914 LogInfo( ( "\r"
915 " For example: { \"action\": \"publish\", \"topic\": \"demo/jobs\", \"message\": \"Hello world!\"} will cause\r\n"
916 " \"Hello world!\" to be published to the topic \"demo/jobs\".\r\n" ) );
917 LogInfo( ( "\r"
918 " - exit \r\n"
919 " Exits the demo program. This program will run until { \"action\": \"exit\" } is received.\r\n"
920 "\r\n"
921 "/*-----------------------------------------------------------*/\r\n" ) );
922
923 /* Subscribe to the NextJobExecutionChanged API topic to receive notifications about the next pending
924 * job in the queue for the Thing resource used by this demo. */
925 if( xSubscribeToTopic( &xMqttContext,
926 NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
927 sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
928 {
929 xDemoStatus = pdFAIL;
930 LogError( ( "Failed to subscribe to NextJobExecutionChanged API of AWS IoT Jobs service: Topic=%s",
931 NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
932 }
933 }
934
935 if( xDemoStatus == pdPASS )
936 {
937 /* Publish to AWS IoT Jobs on the DescribeJobExecution API to request the next pending job.
938 *
939 * Note: It is not required to make MQTT subscriptions to the response topics of the
940 * DescribeJobExecution API because the AWS IoT Jobs service sends responses for
941 * the PUBLISH commands on the same MQTT connection irrespective of whether the client has subscribed
942 * to the response topics or not.
943 * This demo processes incoming messages from the response topics of the API in the prvEventCallback()
944 * handler that is supplied to the coreMQTT library. */
945 if( xPublishToTopic( &xMqttContext,
946 DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ),
947 sizeof( DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1,
948 NULL,
949 0 ) != pdPASS )
950 {
951 xDemoStatus = pdFAIL;
952 LogError( ( "Failed to publish to DescribeJobExecution API of AWS IoT Jobs service: "
953 "Topic=%s", DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) );
954 }
955 }
956
957 /* Keep on running the demo until we receive a job for the "exit" action to exit the demo. */
958 while( ( xExitActionJobReceived == pdFALSE ) &&
959 ( xDemoEncounteredError == pdFALSE ) &&
960 ( xDemoStatus == pdPASS ) )
961 {
962 MQTTPublishInfo_t * pxJobMessagePublishInfo;
963 MQTTStatus_t xMqttStatus = MQTTSuccess;
964
965 /* Check if we have notification for the next pending job in the queue from the
966 * NextJobExecutionChanged API of the AWS IoT Jobs service. */
967 xMqttStatus = MQTT_ProcessLoop( &xMqttContext );
968
969 /* Receive any incoming Jobs message. */
970 if( xQueueReceive( xJobMessageQueue, &pxJobMessagePublishInfo, 0 ) == pdTRUE )
971 {
972 /* Handler function to process Jobs message payload. */
973 prvNextJobHandler( pxJobMessagePublishInfo );
974 vPortFree( ( void * ) ( pxJobMessagePublishInfo->pTopicName ) );
975 vPortFree( ( void * ) ( pxJobMessagePublishInfo->pPayload ) );
976 vPortFree( pxJobMessagePublishInfo );
977 }
978
979 if( xMqttStatus != MQTTSuccess )
980 {
981 xDemoStatus = pdFAIL;
982 LogError( ( "Failed to receive notification about next pending job: "
983 "MQTT_ProcessLoop failed" ) );
984 }
985 }
986
987 /* Increment the demo run count. */
988 uxDemoRunCount++;
989
990 /* Retry demo loop only if there is a failure before completing
991 * the processing of any pending jobs. Any failure in MQTT unsubscribe
992 * or disconnect is considered a failure in demo execution and retry
993 * will not be attempted since a retry without any pending jobs will
994 * make this demo indefinitely wait. */
995 if( ( xDemoStatus == pdFAIL ) || ( xDemoEncounteredError == pdTRUE ) )
996 {
997 if( uxDemoRunCount < JOBS_MAX_DEMO_LOOP_COUNT )
998 {
999 LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
1000 retryDemoLoop = pdTRUE;
1001 }
1002 else
1003 {
1004 LogError( ( "All %d demo iterations failed.", JOBS_MAX_DEMO_LOOP_COUNT ) );
1005 retryDemoLoop = pdFALSE;
1006 }
1007 }
1008 else
1009 {
1010 /* Reset the flag for demo retry. */
1011 retryDemoLoop = pdFALSE;
1012 }
1013
1014 /* Unsubscribe from the NextJobExecutionChanged API topic. */
1015 if( xUnsubscribeFromTopic( &xMqttContext,
1016 NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
1017 sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
1018 {
1019 xDemoStatus = pdFAIL;
1020 LogError( ( "Failed to subscribe unsubscribe from the NextJobExecutionChanged API of AWS IoT Jobs service: "
1021 "Topic=%s", NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
1022 }
1023
1024 /* Disconnect the MQTT and network connections with AWS IoT. */
1025 if( xDisconnectMqttSession( &xMqttContext, &xNetworkContext ) != pdPASS )
1026 {
1027 xDemoStatus = pdFAIL;
1028 LogError( ( "Disconnection from AWS IoT failed..." ) );
1029 }
1030
1031 /* Add a delay if a retry is required. */
1032 if( retryDemoLoop == pdTRUE )
1033 {
1034 /* Clear the flag that indicates that indicates the demo error
1035 * before attempting a retry. */
1036 xDemoEncounteredError = pdFALSE;
1037
1038 LogInfo( ( "A short delay before the next demo iteration." ) );
1039 vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
1040 }
1041 } while( retryDemoLoop == pdTRUE );
1042
1043 if( ( xDemoEncounteredError == pdFALSE ) && ( xDemoStatus == pdPASS ) )
1044 {
1045 LogInfo( ( "Demo completed successfully." ) );
1046 }
1047
1048 LogInfo( ( "-------DEMO FINISHED-------\r\n" ) );
1049
1050 /* Delete this demo task. */
1051 LogInfo( ( "Deleting Jobs Demo task." ) );
1052 vTaskDelete( NULL );
1053 }
1054