Morse Micro IoT SDK  2.9.7
mqttdemo.c
Go to the documentation of this file.
1/*
2 * Copyright 2023 Morse Micro
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
96#include <string.h>
97#include "mmosal.h"
98#include "mmwlan.h"
99#include "mmconfig.h"
100#include "mmipal.h"
101#include "mbedtls/build_info.h"
102#include "mbedtls/platform.h"
103#include "mbedtls/net.h"
104#include "mbedtls/ssl.h"
105#include "mbedtls/entropy.h"
106#include "mbedtls/ctr_drbg.h"
107#include "mbedtls/debug.h"
108#include "core_mqtt.h"
109#include "mm_app_common.h"
110
116#define CLIENT_ID_PREFIX "MM_Client_%s"
117
123#define MQTT_BROKER_ENDPOINT "test.mosquitto.org"
124
128#define MQTT_BROKER_PORT 1883
129
130
132#define KEEP_ALIVE_TIMEOUT_SECONDS 60
134#define CONNACK_RECV_TIMEOUT_MS 10000
135
139#ifdef ipconfigSOCK_DEFAULT_RECEIVE_BLOCK_TIME
140 #define ipconfigSOCK_DEFAULT_RECEIVE_BLOCK_TIME (10000)
141#endif
142
148#define DELAY_BETWEEN_PUBLISHES 1000
149
151#define TOPIC_COUNT 1
152
154#define TOPIC_FORMAT "/MorseMicro/%s/topic"
155
157#define EXAMPLE_MESSAGE "G'day World!"
159#define MAC_ADDR_STR_LEN (18)
160
162static unsigned char buf[1024];
163
164
170static void MQTTProcessIncomingPublish(MQTTPublishInfo_t * pxPublishInfo)
171{
172 /* Strings are not zero terminated, so we need to explicitly copy and terminate them */
173 static char tmptopic[80];
174 static char tmppayload[128];
175 size_t topic_name_length;
176 size_t payload_length;
177
178 topic_name_length = pxPublishInfo->topicNameLength;
179 if (topic_name_length >= sizeof(tmptopic))
180 {
181 topic_name_length = sizeof(tmptopic) - 1;
182 }
183 strncpy(tmptopic, pxPublishInfo->pTopicName, topic_name_length);
184 tmptopic[topic_name_length] = '\0';
185
186 payload_length = pxPublishInfo->payloadLength;
187 if (payload_length >= sizeof(tmppayload))
188 {
189 payload_length = sizeof(tmppayload) - 1;
190 }
191 strncpy(tmppayload, (char*)pxPublishInfo->pPayload, payload_length);
192 tmppayload[payload_length] = '\0';
193
194 printf("Incoming Topic: %s\n"
195 "Incoming Message : %s\n",
196 tmptopic,
197 tmppayload);
198}
199
205static void MQTTProcessResponse(MQTTPacketInfo_t * pxIncomingPacket,
206 uint16_t usPacketId)
207{
208 MQTTStatus_t xResult = MQTTSuccess;
209 uint8_t * pucPayload = NULL;
210 size_t ulSize = 0;
211
212 (void) usPacketId;
213
214 switch (pxIncomingPacket->type)
215 {
216 case MQTT_PACKET_TYPE_SUBACK:
217 /* A SUBACK from the broker, containing the server response to our
218 * subscription request, has been received. It contains the status
219 * code indicating server approval/rejection for the subscription to
220 * the single topic requested. The SUBACK will be parsed to obtain
221 * the status code, and this status code will be stored in
222 * #xTopicFilterContext. */
223 xResult = MQTT_GetSubAckStatusCodes(pxIncomingPacket,
224 &pucPayload,
225 &ulSize);
226
227 /* MQTT_GetSubAckStatusCodes always returns success if called with
228 * packet info from the event callback and non-NULL parameters. */
229 MMOSAL_ASSERT(xResult == MQTTSuccess);
230 break;
231
232 case MQTT_PACKET_TYPE_UNSUBACK:
233 /* We should check which topic was unsubscribed to by looking at the packetid */
234 printf("Unsubscribed from requested topic\n");
235 break;
236
237 case MQTT_PACKET_TYPE_PINGRESP:
238 /* Nothing to be done from application as library handles
239 * PINGRESP with the use of MQTT_ProcessLoop API function. */
240 printf("WARNING: PINGRESP should not be handled by the application "
241 "callback when using MQTT_ProcessLoop.\n");
242 break;
243
244 /* Any other packet type is invalid. */
245 default:
246 printf("MQTTProcessResponse() called with unknown packet type:(%02X).\n",
247 pxIncomingPacket->type);
248 }
249}
250
257static void EventCallback(MQTTContext_t * pxMQTTContext,
258 MQTTPacketInfo_t * pxPacketInfo,
259 MQTTDeserializedInfo_t * pxDeserializedInfo)
260{
261 /* The MQTT context is not used for this demo. */
262 (void) pxMQTTContext;
263
264 if ((pxPacketInfo->type & 0xF0U) == MQTT_PACKET_TYPE_PUBLISH)
265 {
266 MQTTProcessIncomingPublish(pxDeserializedInfo->pPublishInfo);
267 }
268 else
269 {
270 MQTTProcessResponse(pxPacketInfo, pxDeserializedInfo->packetIdentifier);
271 }
272}
273
281MQTTStatus_t CreateMQTTConnectionToBroker(MQTTContext_t * pxMQTTContext,
282 NetworkContext_t * pxNetworkContext,
283 char * clientID)
284{
285 MQTTStatus_t xResult;
286 MQTTConnectInfo_t xConnectInfo;
287 bool xSessionPresent;
288 TransportInterface_t xTransport;
289 MQTTFixedBuffer_t xBuffer;
290
291 xBuffer.pBuffer = buf;
292 xBuffer.size = sizeof(buf);
293
294 /* Fill in Transport Interface send and receive function pointers. */
295 memset(&xTransport, 0, sizeof(xTransport));
296 xTransport.pNetworkContext = pxNetworkContext;
297 xTransport.send = transport_send;
298 xTransport.recv = transport_recv;
299
300 /* Initialize MQTT library. */
301 xResult = MQTT_Init(pxMQTTContext,
302 &xTransport,
305 &xBuffer);
306 if (xResult != MQTTSuccess)
307 {
308 return xResult;
309 }
310
311 /* Many fields not used in this demo so start with everything at 0. */
312 (void) memset((void *) &xConnectInfo, 0x00, sizeof(xConnectInfo));
313
314 /* Start with a clean session i.e. direct the MQTT broker to discard any
315 * previous session data. Also, establishing a connection with clean
316 * session will ensure that the broker does not store any data when this
317 * client gets disconnected. */
318 xConnectInfo.cleanSession = true;
319
320 /* The client identifier is used to uniquely identify this MQTT client to
321 * the MQTT broker. In a production device the identifier can be something
322 * unique, such as a device serial number. */
323 xConnectInfo.pClientIdentifier = clientID;
324 xConnectInfo.clientIdentifierLength = (uint16_t) strlen(clientID);
325
326 /* Set MQTT keep-alive period. It is the responsibility of the application
327 * to ensure that the interval between Control Packets being sent does not
328 * exceed the Keep Alive value. In the absence of sending any other
329 * Control Packets, the Client MUST send a PINGREQ Packet. */
330 xConnectInfo.keepAliveSeconds = KEEP_ALIVE_TIMEOUT_SECONDS;
331
332 /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it
333 * is passed as NULL. */
334 xResult = MQTT_Connect(pxMQTTContext,
335 &xConnectInfo,
336 NULL,
338 &xSessionPresent);
339 return xResult;
340}
341
348MQTTStatus_t MQTTSubscribe(MQTTContext_t * pxMQTTContext,
349 const char * topic)
350{
351 MQTTStatus_t xResult = MQTTSuccess;
352 MQTTSubscribeInfo_t xMQTTSubscription[ TOPIC_COUNT ];
353 uint16_t usSubscribePacketIdentifier;
354
355 /* Some fields not used by this demo so start with everything at 0. */
356 (void) memset((void *) &xMQTTSubscription, 0x00, sizeof(xMQTTSubscription));
357
358 /* Each packet requires a unique ID. */
359 usSubscribePacketIdentifier = MQTT_GetPacketId(pxMQTTContext);
360
361 /* Subscribe to the pcExampleTopic topic filter. This example subscribes
362 * to only one topic and uses QoS0. */
363 xMQTTSubscription[ 0 ].qos = MQTTQoS0;
364 xMQTTSubscription[ 0 ].pTopicFilter = topic;
365 xMQTTSubscription[ 0 ].topicFilterLength = strlen(topic);
366
367 /* The client is already connected to the broker. Subscribe to the topic
368 * as specified in pcExampleTopic by sending a subscribe packet then
369 * waiting for a subscribe acknowledgment (SUBACK). */
370 xResult = MQTT_Subscribe(pxMQTTContext,
371 xMQTTSubscription,
372 1, /* Only subscribing to one topic. */
373 usSubscribePacketIdentifier);
374 if (xResult != MQTTSuccess)
375 {
376 return xResult;
377 }
378
379 /* Process incoming packet from the broker. After sending the
380 * subscribe, the client may receive a publish before it receives a
381 * subscribe ack. Therefore, call generic incoming packet processing
382 * function. Since this demo is subscribing to the topic to which no
383 * one is publishing, probability of receiving Publish message before
384 * subscribe ack is zero; but application must be ready to receive any
385 * packet. This demo uses the generic packet processing function
386 * everywhere to highlight this fact. Note there is a separate demo that
387 * shows how to use coreMQTT in a thread safe way – in which case the
388 * MQTT protocol runs in the background and this call is not required. */
389 xResult = MQTT_ProcessLoop(pxMQTTContext);
390 return xResult;
391}
392
399MQTTStatus_t MQTTUnsubscribeFromTopic(MQTTContext_t * pxMQTTContext,
400 const char * topic)
401{
402 MQTTStatus_t xResult;
403 MQTTSubscribeInfo_t xMQTTSubscription[ TOPIC_COUNT ];
404 uint16_t usUnsubscribePacketIdentifier;
405
406 /* Some fields not used by this demo so start with everything at 0. */
407 (void) memset((void *) &xMQTTSubscription, 0x00, sizeof(xMQTTSubscription));
408
409 /* Subscribe to the pcExampleTopic topic filter. This example subscribes
410 * to only one topic and uses QoS0. */
411 xMQTTSubscription[ 0 ].qos = MQTTQoS0;
412 xMQTTSubscription[ 0 ].pTopicFilter = topic;
413 xMQTTSubscription[ 0 ].topicFilterLength = (uint16_t) strlen(
414 topic);
415
416 /* Each packet requires a unique ID. */
417 usUnsubscribePacketIdentifier = MQTT_GetPacketId(pxMQTTContext);
418
419 /* Send UNSUBSCRIBE packet. */
420 xResult = MQTT_Unsubscribe(pxMQTTContext,
421 xMQTTSubscription,
422 sizeof(xMQTTSubscription) / sizeof(
423 MQTTSubscribeInfo_t),
424 usUnsubscribePacketIdentifier);
425
426 return xResult;
427}
428
437MQTTStatus_t MQTTPublishToTopic(MQTTContext_t * pxMQTTContext,
438 const char * topic,
439 void * payload, size_t payloadLength)
440{
441 MQTTStatus_t xResult;
442 MQTTPublishInfo_t xMQTTPublishInfo;
443
444 /* Some fields are not used by this demo so start with everything at 0. */
445 (void) memset((void *) &xMQTTPublishInfo, 0x00, sizeof(xMQTTPublishInfo));
446
447 /* This demo uses QoS0. */
448 xMQTTPublishInfo.qos = MQTTQoS0;
449 xMQTTPublishInfo.retain = false;
450 xMQTTPublishInfo.pTopicName = topic;
451 xMQTTPublishInfo.topicNameLength = (uint16_t) strlen(topic);
452 xMQTTPublishInfo.pPayload = payload;
453 xMQTTPublishInfo.payloadLength = payloadLength;
454
455 /* Send PUBLISH packet. Packet ID is not used for a QoS0 publish. */
456 xResult = MQTT_Publish(pxMQTTContext, &xMQTTPublishInfo, 0U);
457 return xResult;
458}
459
464void app_init(void)
465{
466 printf("\n\nMorse MQTT Demo (Built " __DATE__ " " __TIME__ ")\n\n");
467
468 /* Initialize and connect to Wi-Fi, blocks till connected */
471
472 uint32_t ulPublishCount = 0U;
473 const uint32_t ulMaxPublishCount = 5UL;
474 NetworkContext_t xNetworkContext = { 0 };
475 MQTTContext_t xMQTTContext;
476 MQTTStatus_t xMQTTStatus;
477 TransportStatus_t xNetworkStatus;
478
479 /* Save space on stack by allocating static, no need to make this global */
480 static char client_id[48];
481 static char topic[80];
482 static char server[80];
483 static char message[80];
484 uint32_t port = MQTT_BROKER_PORT;
485 uint32_t publish_delay = DELAY_BETWEEN_PUBLISHES;
486
487 /* Generate Client ID & topic from MAC */
488 uint8_t mac_addr[MMWLAN_MAC_ADDR_LEN] = { 0 };
489 char mac_address_str[MAC_ADDR_STR_LEN];
490 enum mmwlan_status status = mmwlan_get_mac_addr(mac_addr);
491 if (status != MMWLAN_SUCCESS)
492 {
493 printf("Failed to read MAC address (status code %d)\n", status);
494 return;
495 }
496 snprintf(mac_address_str, sizeof(mac_address_str), "%02x:%02x:%02x:%02x:%02x:%02x",
497 mac_addr[0], mac_addr[1], mac_addr[2], mac_addr[3], mac_addr[4], mac_addr[5]);
498 snprintf(client_id, sizeof(client_id), CLIENT_ID_PREFIX, mac_address_str);
499 snprintf(topic, sizeof(topic), TOPIC_FORMAT, client_id);
500
501 /* Read from config store */
502 (void)mmconfig_read_string("mqtt.clientid", client_id, sizeof(client_id));
503 (void)mmconfig_read_string("mqtt.topic", topic, sizeof(topic));
504 (void)mmconfig_read_uint32("mqtt.port", &port);
505 (void)mmconfig_read_uint32("mqtt.publish_delay", &publish_delay);
506
507 strncpy(server, MQTT_BROKER_ENDPOINT, sizeof(server));
508 (void)mmconfig_read_string("mqtt.server", server, sizeof(server));
509
510 strncpy(message, EXAMPLE_MESSAGE, sizeof(message));
511 (void)mmconfig_read_string("mqtt.message", message, sizeof(message));
512
513 /*************************** Connect. *********************************/
514
515 /* Attempt to connect to the MQTT broker. The socket is returned in
516 * the network context structure. We set NetworkCredentials to NULL to connect in the clear.
517 * Set this parameter if you wish to connect with TLS */
518 printf("Connecting to server socket on %s:%ld...", server, port);
519 xNetworkStatus = transport_connect(&xNetworkContext, server, (uint16_t) port, NULL);
520 if (xNetworkStatus != TRANSPORT_SUCCESS)
521 {
522 printf("failed with code %d\n", xNetworkStatus);
523 return;
524 }
525 printf("ok\n");
526
527 /* Connect to the MQTT broker using the already connected TCP socket. */
528 printf("Client %s Creating MQTT connection with broker....", client_id);
529 xMQTTStatus = CreateMQTTConnectionToBroker(&xMQTTContext, &xNetworkContext, client_id);
530 if (xMQTTStatus != MQTTSuccess)
531 {
532 printf("failed with code %d\n", xMQTTStatus);
533 transport_disconnect(&xNetworkContext);
534 return;
535 }
536 printf("ok\n");
537
538 /**************************** Subscribe. ******************************/
539
540 /* Subscribe to the test topic. */
541 printf("Subscribing to topic %s...", topic);
542 xMQTTStatus = MQTTSubscribe(&xMQTTContext, topic);
543 if (xMQTTStatus != MQTTSuccess)
544 {
545 printf("failed with code %d\n", xMQTTStatus);
546 goto quit;
547 }
548 printf("ok\n");
549
550 /******************* Publish and Keep Alive Loop. *********************/
551
552 /* Publish messages with QoS0, then send and process Keep Alive messages. */
553 for (ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++)
554 {
555 printf("Publishing to topic %s...", topic);
556 xMQTTStatus = MQTTPublishToTopic(&xMQTTContext, topic,
557 message, strlen(message));
558 if (xMQTTStatus != MQTTSuccess)
559 {
560 printf("failed with code %d\n", xMQTTStatus);
561 goto quit;
562 }
563 printf("ok\n");
564
565 /* Process the incoming publish echo. Since the application subscribed
566 * to the same topic, the broker will send the same publish message
567 * back to the application. Note there is a separate demo that
568 * shows how to use coreMQTT in a thread safe way - in which case the
569 * MQTT protocol runs in the background and this call is not
570 * required. */
571 xMQTTStatus = MQTT_ProcessLoop(&xMQTTContext);
572 if (xMQTTStatus != MQTTSuccess)
573 {
574 printf("MQTT_ProcessLoop() failed with code %d\n", xMQTTStatus);
575 goto quit;
576 }
577
578 /* Leave the connection idle for some time. */
579 mmosal_task_sleep(publish_delay);
580 }
581
582 /******************** Unsubscribe from the topic. *********************/
583
584 xMQTTStatus = MQTTUnsubscribeFromTopic(&xMQTTContext, topic);
585 if (xMQTTStatus != MQTTSuccess)
586 {
587 printf("MQTTUnsubscribeFromTopic() failed with code %d\n", xMQTTStatus);
588 goto quit;
589 }
590
591 /* Process the incoming packet from the broker. Note there is a separate
592 * demo that shows how to use coreMQTT in a thread safe way - in which case
593 * the MQTT protocol runs in the background and this call is not required. */
594 xMQTTStatus = MQTT_ProcessLoop(&xMQTTContext);
595 if (xMQTTStatus != MQTTSuccess)
596 {
597 printf("MQTT_ProcessLoop() failed with code %d\n", xMQTTStatus);
598 goto quit;
599 }
600
601 /**************************** Disconnect. *****************************/
602
603quit:
604 /* Disconnect from broker. */
605 printf("Disconnecting from server and closing socket.\n");
606 xMQTTStatus = MQTT_Disconnect(&xMQTTContext);
607 if (xMQTTStatus != MQTTSuccess)
608 {
609 printf("MQTT_Disconnect() failed with code %d\n", xMQTTStatus);
610 }
611
612 /* Close the network connection. */
613 transport_disconnect(&xNetworkContext);
614}
int mmconfig_read_string(const char *key, char *buffer, int bufsize)
Returns the persistent store string value identified by the key.
int mmconfig_read_uint32(const char *key, uint32_t *value)
Returns the unsigned integer stored in persistent store identified by the key.
#define MMOSAL_ASSERT(expr)
Assert that the given expression evaluates to true and abort execution if not.
Definition: mmosal.h:927
void mmosal_task_sleep(uint32_t duration_ms)
Sleep for a period of time, yielding during that time.
uint32_t mmosal_get_time_ms(void)
Get the system time in milliseconds.
enum mmwlan_status mmwlan_get_mac_addr(uint8_t *mac_addr)
Gets the MAC address of this device.
mmwlan_status
Enumeration of status return codes.
Definition: mmwlan.h:50
#define MMWLAN_MAC_ADDR_LEN
Length of a WLAN MAC address.
Definition: mmwlan.h:88
@ MMWLAN_SUCCESS
The operation was successful.
Definition: mmwlan.h:52
Morse Micro application helper routines for initializing/de-initializing the Wireless LAN interface a...
void app_wlan_init(void)
Initializes the WLAN interface (and dependencies) using settings specified in the config store.
void app_wlan_start(void)
Starts the WLAN interface and connects to Wi-Fi using settings specified in the config store.
#define MAC_ADDR_STR_LEN
Length of MAC address string (i.e., "XX:XX:XX:XX:XX:XX") including terminator.
Definition: mqttdemo.c:159
#define EXAMPLE_MESSAGE
Message to publish/subscribe.
Definition: mqttdemo.c:157
MQTTStatus_t MQTTSubscribe(MQTTContext_t *pxMQTTContext, const char *topic)
Subscribes to the specified topic.
Definition: mqttdemo.c:348
MQTTStatus_t MQTTUnsubscribeFromTopic(MQTTContext_t *pxMQTTContext, const char *topic)
Unsubscribes from the specified topic.
Definition: mqttdemo.c:399
#define DELAY_BETWEEN_PUBLISHES
Override the default FreeRTOS + TCP receive socket timeouts, as the test server can be slow to respon...
Definition: mqttdemo.c:148
static void MQTTProcessIncomingPublish(MQTTPublishInfo_t *pxPublishInfo)
This callback gets called when a published message matches one of our subscribed topics.
Definition: mqttdemo.c:170
MQTTStatus_t MQTTPublishToTopic(MQTTContext_t *pxMQTTContext, const char *topic, void *payload, size_t payloadLength)
Publish a message to the specified MQTT topic.
Definition: mqttdemo.c:437
static void EventCallback(MQTTContext_t *pxMQTTContext, MQTTPacketInfo_t *pxPacketInfo, MQTTDeserializedInfo_t *pxDeserializedInfo)
This is a callback from MQTT_Process whenever a packet is received from the server.
Definition: mqttdemo.c:257
#define TOPIC_COUNT
Number of topics we subscribe to.
Definition: mqttdemo.c:151
#define KEEP_ALIVE_TIMEOUT_SECONDS
Keep alive Delay.
Definition: mqttdemo.c:132
MQTTStatus_t CreateMQTTConnectionToBroker(MQTTContext_t *pxMQTTContext, NetworkContext_t *pxNetworkContext, char *clientID)
Initializes an MQTT connection with the server.
Definition: mqttdemo.c:281
static unsigned char buf[1024]
Statically allocated buffer for MQTT.
Definition: mqttdemo.c:162
#define MQTT_BROKER_PORT
Broker port.
Definition: mqttdemo.c:128
#define CLIENT_ID_PREFIX
The MQTT client identifier used in this example.
Definition: mqttdemo.c:116
#define TOPIC_FORMAT
Topic to publish/subscribe, we include the client ID to keep it unique.
Definition: mqttdemo.c:154
#define MQTT_BROKER_ENDPOINT
Broker address to connect to.
Definition: mqttdemo.c:123
#define CONNACK_RECV_TIMEOUT_MS
Receive timeout.
Definition: mqttdemo.c:134
void app_init(void)
Main entry point to the application.
Definition: mqttdemo.c:464
static void MQTTProcessResponse(MQTTPacketInfo_t *pxIncomingPacket, uint16_t usPacketId)
This callback gets called whenever we receive an ACK from the server.
Definition: mqttdemo.c:205