1 /*******************************************************************************
2  * Copyright (c) 2014, 2017 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *   Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
15  *   Ian Craggs - fix for #96 - check rem_len in readPacket
16  *   Ian Craggs - add ability to set message handler separately #6
17  *******************************************************************************/
18 #include "MQTTClient.h"
19 
20 #include <stdio.h>
21 #include <string.h>
22 
NewMessageData(MessageData * md,MQTTString * aTopicName,MQTTMessage * aMessage)23 static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
24     md->topicName = aTopicName;
25     md->message = aMessage;
26 }
27 
28 
getNextPacketId(MQTTClient * c)29 static int getNextPacketId(MQTTClient *c) {
30     return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
31 }
32 
33 
sendPacket(MQTTClient * c,int length,Timer * timer)34 static int sendPacket(MQTTClient* c, int length, Timer* timer)
35 {
36     int rc = FAILURE,
37         sent = 0;
38 
39     while (sent < length && !TimerIsExpired(timer))
40     {
41         rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
42         if (rc < 0)  // there was an error writing the data
43             break;
44         sent += rc;
45     }
46     if (sent == length)
47     {
48         TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
49         rc = SUCCESS;
50     }
51     else
52         rc = FAILURE;
53     return rc;
54 }
55 
56 
MQTTClientInit(MQTTClient * c,Network * network,unsigned int command_timeout_ms,unsigned char * sendbuf,size_t sendbuf_size,unsigned char * readbuf,size_t readbuf_size)57 void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
58 		unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
59 {
60     int i;
61     c->ipstack = network;
62 
63     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
64         c->messageHandlers[i].topicFilter = 0;
65     c->command_timeout_ms = command_timeout_ms;
66     c->buf = sendbuf;
67     c->buf_size = sendbuf_size;
68     c->readbuf = readbuf;
69     c->readbuf_size = readbuf_size;
70     c->isconnected = 0;
71     c->cleansession = 0;
72     c->ping_outstanding = 0;
73     c->defaultMessageHandler = NULL;
74 	  c->next_packetid = 1;
75     TimerInit(&c->last_sent);
76     TimerInit(&c->last_received);
77 #if defined(MQTT_TASK)
78 	  MutexInit(&c->mutex);
79 #endif
80 }
81 
82 
decodePacket(MQTTClient * c,int * value,int timeout)83 static int decodePacket(MQTTClient* c, int* value, int timeout)
84 {
85     unsigned char i;
86     int multiplier = 1;
87     int len = 0;
88     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
89 
90     *value = 0;
91     do
92     {
93         int rc = MQTTPACKET_READ_ERROR;
94 
95         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
96         {
97             rc = MQTTPACKET_READ_ERROR; /* bad data */
98             goto exit;
99         }
100         rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
101         if (rc != 1)
102             goto exit;
103         *value += (i & 127) * multiplier;
104         multiplier *= 128;
105     } while ((i & 128) != 0);
106 exit:
107     return len;
108 }
109 
110 
readPacket(MQTTClient * c,Timer * timer)111 static int readPacket(MQTTClient* c, Timer* timer)
112 {
113     MQTTHeader header = {0};
114     int len = 0;
115     int rem_len = 0;
116 
117     /* 1. read the header byte.  This has the packet type in it */
118     int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
119     if (rc != 1)
120         goto exit;
121 
122     len = 1;
123     /* 2. read the remaining length.  This is variable in itself */
124     decodePacket(c, &rem_len, TimerLeftMS(timer));
125     len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
126 
127     if (rem_len > (c->readbuf_size - len))
128     {
129         rc = BUFFER_OVERFLOW;
130         goto exit;
131     }
132 
133     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
134     if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
135         rc = 0;
136         goto exit;
137     }
138 
139     header.byte = c->readbuf[0];
140     rc = header.bits.type;
141     if (c->keepAliveInterval > 0)
142         TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
143 exit:
144     return rc;
145 }
146 
147 
148 // assume topic filter and name is in correct format
149 // # can only be at end
150 // + and # can only be next to separator
isTopicMatched(char * topicFilter,MQTTString * topicName)151 static char isTopicMatched(char* topicFilter, MQTTString* topicName)
152 {
153     char* curf = topicFilter;
154     char* curn = topicName->lenstring.data;
155     char* curn_end = curn + topicName->lenstring.len;
156 
157     while (*curf && curn < curn_end)
158     {
159         if (*curn == '/' && *curf != '/')
160             break;
161         if (*curf != '+' && *curf != '#' && *curf != *curn)
162             break;
163         if (*curf == '+')
164         {   // skip until we meet the next separator, or end of string
165             char* nextpos = curn + 1;
166             while (nextpos < curn_end && *nextpos != '/')
167                 nextpos = ++curn + 1;
168         }
169         else if (*curf == '#')
170             curn = curn_end - 1;    // skip until end of string
171         curf++;
172         curn++;
173     };
174 
175     return (curn == curn_end) && (*curf == '\0');
176 }
177 
178 
deliverMessage(MQTTClient * c,MQTTString * topicName,MQTTMessage * message)179 int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
180 {
181     int i;
182     int rc = FAILURE;
183 
184     // we have to find the right message handler - indexed by topic
185     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
186     {
187         if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
188                 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
189         {
190             if (c->messageHandlers[i].fp != NULL)
191             {
192                 MessageData md;
193                 NewMessageData(&md, topicName, message);
194                 c->messageHandlers[i].fp(&md);
195                 rc = SUCCESS;
196             }
197         }
198     }
199 
200     if (rc == FAILURE && c->defaultMessageHandler != NULL)
201     {
202         MessageData md;
203         NewMessageData(&md, topicName, message);
204         c->defaultMessageHandler(&md);
205         rc = SUCCESS;
206     }
207 
208     return rc;
209 }
210 
211 
keepalive(MQTTClient * c)212 int keepalive(MQTTClient* c)
213 {
214     int rc = SUCCESS;
215 
216     if (c->keepAliveInterval == 0)
217         goto exit;
218 
219     if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
220     {
221         if (c->ping_outstanding)
222             rc = FAILURE; /* PINGRESP not received in keepalive interval */
223         else
224         {
225             Timer timer;
226             TimerInit(&timer);
227             TimerCountdownMS(&timer, 1000);
228             int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
229             if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
230                 c->ping_outstanding = 1;
231         }
232     }
233 
234 exit:
235     return rc;
236 }
237 
238 
MQTTCleanSession(MQTTClient * c)239 void MQTTCleanSession(MQTTClient* c)
240 {
241     int i = 0;
242 
243     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
244         c->messageHandlers[i].topicFilter = NULL;
245 }
246 
247 
MQTTCloseSession(MQTTClient * c)248 void MQTTCloseSession(MQTTClient* c)
249 {
250     c->ping_outstanding = 0;
251     c->isconnected = 0;
252     if (c->cleansession)
253         MQTTCleanSession(c);
254 }
255 
256 
cycle(MQTTClient * c,Timer * timer)257 int cycle(MQTTClient* c, Timer* timer)
258 {
259     int len = 0,
260         rc = SUCCESS;
261 
262     int packet_type = readPacket(c, timer);     /* read the socket, see what work is due */
263 
264     switch (packet_type)
265     {
266         default:
267             /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
268             rc = packet_type;
269             goto exit;
270         case 0: /* timed out reading packet */
271             break;
272         case CONNACK:
273         case PUBACK:
274         case SUBACK:
275         case UNSUBACK:
276             break;
277         case PUBLISH:
278         {
279             MQTTString topicName;
280             MQTTMessage msg;
281             int intQoS;
282             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
283             if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
284                (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
285                 goto exit;
286             msg.qos = (enum QoS)intQoS;
287             deliverMessage(c, &topicName, &msg);
288             if (msg.qos != QOS0)
289             {
290                 if (msg.qos == QOS1)
291                     len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
292                 else if (msg.qos == QOS2)
293                     len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
294                 if (len <= 0)
295                     rc = FAILURE;
296                 else
297                     rc = sendPacket(c, len, timer);
298                 if (rc == FAILURE)
299                     goto exit; // there was a problem
300             }
301             break;
302         }
303         case PUBREC:
304         case PUBREL:
305         {
306             unsigned short mypacketid;
307             unsigned char dup, type;
308             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
309                 rc = FAILURE;
310             else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
311                 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
312                 rc = FAILURE;
313             else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
314                 rc = FAILURE; // there was a problem
315             if (rc == FAILURE)
316                 goto exit; // there was a problem
317             break;
318         }
319 
320         case PUBCOMP:
321             break;
322         case PINGRESP:
323             c->ping_outstanding = 0;
324             break;
325     }
326 
327     if (keepalive(c) != SUCCESS) {
328         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
329         rc = FAILURE;
330     }
331 
332 exit:
333     if (rc == SUCCESS)
334         rc = packet_type;
335     else if (c->isconnected)
336         MQTTCloseSession(c);
337     return rc;
338 }
339 
340 
MQTTYield(MQTTClient * c,int timeout_ms)341 int MQTTYield(MQTTClient* c, int timeout_ms)
342 {
343     int rc = SUCCESS;
344     Timer timer;
345 
346     TimerInit(&timer);
347     TimerCountdownMS(&timer, timeout_ms);
348 
349 	  do
350     {
351         if (cycle(c, &timer) < 0)
352         {
353             rc = FAILURE;
354             break;
355         }
356 	} while (!TimerIsExpired(&timer));
357 
358     return rc;
359 }
360 
MQTTIsConnected(MQTTClient * client)361 int MQTTIsConnected(MQTTClient* client)
362 {
363   return client->isconnected;
364 }
365 
MQTTRun(void * parm)366 void MQTTRun(void* parm)
367 {
368 	Timer timer;
369 	MQTTClient* c = (MQTTClient*)parm;
370 
371 	TimerInit(&timer);
372 
373 	while (1)
374 	{
375 #if defined(MQTT_TASK)
376 		MutexLock(&c->mutex);
377 #endif
378 		TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
379 		cycle(c, &timer);
380 #if defined(MQTT_TASK)
381 		MutexUnlock(&c->mutex);
382 #endif
383 	}
384 }
385 
386 
387 #if defined(MQTT_TASK)
MQTTStartTask(MQTTClient * client)388 int MQTTStartTask(MQTTClient* client)
389 {
390 	return ThreadStart(&client->thread, &MQTTRun, client);
391 }
392 #endif
393 
394 
waitfor(MQTTClient * c,int packet_type,Timer * timer)395 int waitfor(MQTTClient* c, int packet_type, Timer* timer)
396 {
397     int rc = FAILURE;
398 
399     do
400     {
401         if (TimerIsExpired(timer))
402             break; // we timed out
403         rc = cycle(c, timer);
404     }
405     while (rc != packet_type && rc >= 0);
406 
407     return rc;
408 }
409 
410 
411 
412 
MQTTConnectWithResults(MQTTClient * c,MQTTPacket_connectData * options,MQTTConnackData * data)413 int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
414 {
415     Timer connect_timer;
416     int rc = FAILURE;
417     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
418     int len = 0;
419 
420 #if defined(MQTT_TASK)
421 	  MutexLock(&c->mutex);
422 #endif
423 	  if (c->isconnected) /* don't send connect packet again if we are already connected */
424 		  goto exit;
425 
426     TimerInit(&connect_timer);
427     TimerCountdownMS(&connect_timer, c->command_timeout_ms);
428 
429     if (options == 0)
430         options = &default_options; /* set default options if none were supplied */
431 
432     c->keepAliveInterval = options->keepAliveInterval;
433     c->cleansession = options->cleansession;
434     TimerCountdown(&c->last_received, c->keepAliveInterval);
435     if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
436         goto exit;
437     if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS)  // send the connect packet
438         goto exit; // there was a problem
439 
440     // this will be a blocking call, wait for the connack
441     if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
442     {
443         data->rc = 0;
444         data->sessionPresent = 0;
445         if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
446             rc = data->rc;
447         else
448             rc = FAILURE;
449     }
450     else
451         rc = FAILURE;
452 
453 exit:
454     if (rc == SUCCESS)
455     {
456         c->isconnected = 1;
457         c->ping_outstanding = 0;
458     }
459 
460 #if defined(MQTT_TASK)
461 	  MutexUnlock(&c->mutex);
462 #endif
463 
464     return rc;
465 }
466 
467 
MQTTConnect(MQTTClient * c,MQTTPacket_connectData * options)468 int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
469 {
470     MQTTConnackData data;
471     return MQTTConnectWithResults(c, options, &data);
472 }
473 
474 
MQTTSetMessageHandler(MQTTClient * c,const char * topicFilter,messageHandler messageHandler)475 int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
476 {
477     int rc = FAILURE;
478     int i = -1;
479 
480     /* first check for an existing matching slot */
481     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
482     {
483         if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
484         {
485             if (messageHandler == NULL) /* remove existing */
486             {
487                 c->messageHandlers[i].topicFilter = NULL;
488                 c->messageHandlers[i].fp = NULL;
489             }
490             rc = SUCCESS; /* return i when adding new subscription */
491             break;
492         }
493     }
494     /* if no existing, look for empty slot (unless we are removing) */
495     if (messageHandler != NULL) {
496         if (rc == FAILURE)
497         {
498             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
499             {
500                 if (c->messageHandlers[i].topicFilter == NULL)
501                 {
502                     rc = SUCCESS;
503                     break;
504                 }
505             }
506         }
507         if (i < MAX_MESSAGE_HANDLERS)
508         {
509             c->messageHandlers[i].topicFilter = topicFilter;
510             c->messageHandlers[i].fp = messageHandler;
511         }
512     }
513     return rc;
514 }
515 
516 
MQTTSubscribeWithResults(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler,MQTTSubackData * data)517 int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
518        messageHandler messageHandler, MQTTSubackData* data)
519 {
520     int rc = FAILURE;
521     Timer timer;
522     int len = 0;
523     MQTTString topic = MQTTString_initializer;
524     topic.cstring = (char *)topicFilter;
525 
526 #if defined(MQTT_TASK)
527 	  MutexLock(&c->mutex);
528 #endif
529 	  if (!c->isconnected)
530 		    goto exit;
531 
532     TimerInit(&timer);
533     TimerCountdownMS(&timer, c->command_timeout_ms);
534 
535     len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
536     if (len <= 0)
537         goto exit;
538     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
539         goto exit;             // there was a problem
540 
541     if (waitfor(c, SUBACK, &timer) == SUBACK)      // wait for suback
542     {
543         int count = 0;
544         unsigned short mypacketid;
545         data->grantedQoS = QOS0;
546         if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
547         {
548             if (data->grantedQoS != 0x80)
549                 rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
550         }
551     }
552     else
553         rc = FAILURE;
554 
555 exit:
556     if (rc == FAILURE)
557         MQTTCloseSession(c);
558 #if defined(MQTT_TASK)
559 	  MutexUnlock(&c->mutex);
560 #endif
561     return rc;
562 }
563 
564 
MQTTSubscribe(MQTTClient * c,const char * topicFilter,enum QoS qos,messageHandler messageHandler)565 int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
566        messageHandler messageHandler)
567 {
568     MQTTSubackData data;
569     return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
570 }
571 
572 
MQTTUnsubscribe(MQTTClient * c,const char * topicFilter)573 int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
574 {
575     int rc = FAILURE;
576     Timer timer;
577     MQTTString topic = MQTTString_initializer;
578     topic.cstring = (char *)topicFilter;
579     int len = 0;
580 
581 #if defined(MQTT_TASK)
582 	  MutexLock(&c->mutex);
583 #endif
584 	  if (!c->isconnected)
585 		  goto exit;
586 
587     TimerInit(&timer);
588     TimerCountdownMS(&timer, c->command_timeout_ms);
589 
590     if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
591         goto exit;
592     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
593         goto exit; // there was a problem
594 
595     if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
596     {
597         unsigned short mypacketid;  // should be the same as the packetid above
598         if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
599         {
600             /* remove the subscription message handler associated with this topic, if there is one */
601             MQTTSetMessageHandler(c, topicFilter, NULL);
602         }
603     }
604     else
605         rc = FAILURE;
606 
607 exit:
608     if (rc == FAILURE)
609         MQTTCloseSession(c);
610 #if defined(MQTT_TASK)
611 	  MutexUnlock(&c->mutex);
612 #endif
613     return rc;
614 }
615 
616 
MQTTPublish(MQTTClient * c,const char * topicName,MQTTMessage * message)617 int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
618 {
619     int rc = FAILURE;
620     Timer timer;
621     MQTTString topic = MQTTString_initializer;
622     topic.cstring = (char *)topicName;
623     int len = 0;
624 
625 #if defined(MQTT_TASK)
626 	  MutexLock(&c->mutex);
627 #endif
628 	  if (!c->isconnected)
629 		    goto exit;
630 
631     TimerInit(&timer);
632     TimerCountdownMS(&timer, c->command_timeout_ms);
633 
634     if (message->qos == QOS1 || message->qos == QOS2)
635         message->id = getNextPacketId(c);
636 
637     len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
638               topic, (unsigned char*)message->payload, message->payloadlen);
639     if (len <= 0)
640         goto exit;
641     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
642         goto exit; // there was a problem
643 
644     if (message->qos == QOS1)
645     {
646         if (waitfor(c, PUBACK, &timer) == PUBACK)
647         {
648             unsigned short mypacketid;
649             unsigned char dup, type;
650             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
651                 rc = FAILURE;
652         }
653         else
654             rc = FAILURE;
655     }
656     else if (message->qos == QOS2)
657     {
658         if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
659         {
660             unsigned short mypacketid;
661             unsigned char dup, type;
662             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
663                 rc = FAILURE;
664         }
665         else
666             rc = FAILURE;
667     }
668 
669 exit:
670     if (rc == FAILURE)
671         MQTTCloseSession(c);
672 #if defined(MQTT_TASK)
673 	  MutexUnlock(&c->mutex);
674 #endif
675     return rc;
676 }
677 
678 
MQTTDisconnect(MQTTClient * c)679 int MQTTDisconnect(MQTTClient* c)
680 {
681     int rc = FAILURE;
682     Timer timer;     // we might wait for incomplete incoming publishes to complete
683     int len = 0;
684 
685 #if defined(MQTT_TASK)
686 	MutexLock(&c->mutex);
687 #endif
688     TimerInit(&timer);
689     TimerCountdownMS(&timer, c->command_timeout_ms);
690 
691 	  len = MQTTSerialize_disconnect(c->buf, c->buf_size);
692     if (len > 0)
693         rc = sendPacket(c, len, &timer);            // send the disconnect packet
694     MQTTCloseSession(c);
695 
696 #if defined(MQTT_TASK)
697 	  MutexUnlock(&c->mutex);
698 #endif
699     return rc;
700 }
701