From bb2991cb2dac40607ad5617925bd01c200f09b0c Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Thu, 28 Dec 2023 15:52:08 +0700 Subject: [PATCH] working mqtt implementation --- .../lib/ESPMegaPRO/DigitalOutputIoT.cpp | 11 +- .../lib/ESPMegaPRO/ESPMegaIoT.cpp | 199 ++++++++++++------ .../lib/ESPMegaPRO/ESPMegaIoT.hpp | 2 + .../lib/ESPMegaPRO/ExpansionCard.hpp | 6 +- .../lib/ESPMegaPRO/IoTComponent.cpp | 3 +- Template Project/src/iotdemo.cpp | 4 +- 6 files changed, 149 insertions(+), 76 deletions(-) diff --git a/Template Project/lib/ESPMegaPRO/DigitalOutputIoT.cpp b/Template Project/lib/ESPMegaPRO/DigitalOutputIoT.cpp index 7221bd7..568f928 100644 --- a/Template Project/lib/ESPMegaPRO/DigitalOutputIoT.cpp +++ b/Template Project/lib/ESPMegaPRO/DigitalOutputIoT.cpp @@ -11,6 +11,8 @@ DigitalOutputIoT::~DigitalOutputIoT() { } bool DigitalOutputIoT::begin(uint8_t card_id, ExpansionCard *card, PubSubClient *mqtt, char *base_topic) { + Serial.print("Initializing digital output IoT component for card "); + Serial.println(card_id); this->mqtt = mqtt; this->base_topic = base_topic; this->card = (DigitalOutputCard *) card; @@ -171,22 +173,27 @@ uint8_t DigitalOutputIoT::getType() { } void DigitalOutputIoT::subscribe() { + Serial.println("Subscribing to digital output topics"); + char topic[10]; + Serial.println("Subscribe to all set state topics"); // Subscribe to all set state topics for(int i = 1; i <= 16; i++) { - char topic[10]; sprintf(topic, "%02d/set/state", i); subscribeRelative(topic); } + Serial.println("Subscribe to all set value topics"); // Subscribe to all set value topics for(int i = 1; i <= 16; i++) { - char topic[10]; sprintf(topic, "%02d/set/value", i); subscribeRelative(topic); } + Serial.println("Subscribe to request state topic"); // Subscribe to request state topic subscribeRelative(REQUEST_STATE_TOPIC); + Serial.println("Subscribe to publish enable topic"); // Subscribe to publish enable topic subscribeRelative(PUBLISH_ENABLE_TOPIC); + Serial.println("Subscriptions complete"); } void DigitalOutputIoT::loop() { diff --git a/Template Project/lib/ESPMegaPRO/ESPMegaIoT.cpp b/Template Project/lib/ESPMegaPRO/ESPMegaIoT.cpp index b09235c..4a77bff 100644 --- a/Template Project/lib/ESPMegaPRO/ESPMegaIoT.cpp +++ b/Template Project/lib/ESPMegaPRO/ESPMegaIoT.cpp @@ -1,55 +1,66 @@ #include -ESPMegaIoT::ESPMegaIoT() : mqtt(tcpClient) { +ESPMegaIoT::ESPMegaIoT() : mqtt(tcpClient) +{ tcpClient.setTimeout(1); // Initialize the components array - for (int i = 0; i < 255; i++) { + for (int i = 0; i < 255; i++) + { components[i] = NULL; } active = false; mqtt_connected = false; } -void ESPMegaIoT::mqttCallback(char *topic, byte *payload, unsigned int length) { +void ESPMegaIoT::mqttCallback(char *topic, byte *payload, unsigned int length) +{ // Create a null terminated string from the payload memcpy(payload_buffer, payload, length); payload_buffer[length] = '\0'; // Remove the base topic from the topic char *topic_without_base = topic + strlen(base_topic) + 1; + if (user_relative_mqtt_callback != NULL) + { + user_relative_mqtt_callback(topic_without_base + 3, payload_buffer); + } + if (user_mqtt_callback != NULL) + { + user_mqtt_callback(topic, payload_buffer); + } // Call the respective card's mqtt callback // Note that after the base topic, there should be the card id - // /base_topic/card_id/... + // /base_topic/card_id/... // First, get the card id in integer form char *card_id_str = strtok(topic_without_base, "/"); uint8_t card_id = atoi(card_id_str); // Check if the card is registered - if (components[card_id] == NULL) { + if (components[card_id] == NULL) + { return; } components[card_id]->handleMqttMessage(topic_without_base + 3, payload_buffer); - if (user_relative_mqtt_callback != NULL) { - user_relative_mqtt_callback(topic_without_base + 3, payload_buffer); - } - if (user_mqtt_callback != NULL) { - user_mqtt_callback(topic, payload_buffer); - } } -void ESPMegaIoT::setBaseTopic(char *base_topic) { +void ESPMegaIoT::setBaseTopic(char *base_topic) +{ strcpy(this->base_topic, base_topic); base_topic_length = strlen(base_topic); } -void ESPMegaIoT::intr_begin(ExpansionCard *cards[]) { +void ESPMegaIoT::intr_begin(ExpansionCard *cards[]) +{ this->cards = cards; active = true; } -void ESPMegaIoT::loop() { - if(!active) +void ESPMegaIoT::loop() +{ + if (!active) return; // Call each component's loop function - for (int i = 0; i < 255; i++) { - if (components[i] != NULL) { + for (int i = 0; i < 255; i++) + { + if (components[i] != NULL) + { components[i]->loop(); } } @@ -58,78 +69,100 @@ void ESPMegaIoT::loop() { } // Register Existing Card for use with IoT -void ESPMegaIoT::registerCard(uint8_t card_id) { +void ESPMegaIoT::registerCard(uint8_t card_id) +{ // Check if the card is already registered - if (components[card_id] != NULL) { + if (components[card_id] != NULL) + { return; } // Get the card type uint8_t card_type = cards[card_id]->getType(); // Create the respective IoT component - switch (card_type) { - case CARD_TYPE_ANALOG: - components[card_id] = new AnalogIoT(); - components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); - break; - case CARD_TYPE_DIGITAL_INPUT: - components[card_id] = new DigitalInputIoT(); - components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); - break; - case CARD_TYPE_DIGITAL_OUTPUT: - components[card_id] = new DigitalOutputIoT(); - components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); - break; - default: - return; + switch (card_type) + { + // case CARD_TYPE_ANALOG: + // components[card_id] = new AnalogIoT(); + // components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); + // break; + // case CARD_TYPE_DIGITAL_INPUT: + // components[card_id] = new DigitalInputIoT(); + // components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); + // break; + case CARD_TYPE_DIGITAL_OUTPUT: + components[card_id] = new DigitalOutputIoT(); + components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); + if (mqtt_connected) + { + components[card_id]->subscribe(); + Serial.pritnln("Back to register card"); + } + break; + default: + Serial.println("Invalid card type"); + return; } + Serial.println("Card registered"); } -void ESPMegaIoT::deregisterCard(uint8_t card_id) { +void ESPMegaIoT::deregisterCard(uint8_t card_id) +{ // Check if the card is registered - if (components[card_id] == NULL) { + if (components[card_id] == NULL) + { return; } // Delete the IoT component delete components[card_id]; components[card_id] = NULL; } -void ESPMegaIoT::publishCard(uint8_t card_id) { +void ESPMegaIoT::publishCard(uint8_t card_id) +{ // Check if the card is registered - if (components[card_id] == NULL) { + if (components[card_id] == NULL) + { return; } // Publish the card components[card_id]->publishReport(); } -void ESPMegaIoT::subscribeToTopic(char *topic) { +void ESPMegaIoT::subscribeToTopic(char *topic) +{ mqtt.subscribe(topic); } -void ESPMegaIoT::unsubscribeFromTopic(char *topic) { +void ESPMegaIoT::unsubscribeFromTopic(char *topic) +{ mqtt.unsubscribe(topic); } -void ESPMegaIoT::connectToWifi(char *ssid, char *password) { +void ESPMegaIoT::connectToWifi(char *ssid, char *password) +{ WiFi.begin(ssid, password); } -void ESPMegaIoT::connectToWifi(char *ssid) { +void ESPMegaIoT::connectToWifi(char *ssid) +{ WiFi.begin(ssid); } -void ESPMegaIoT::disconnectFromWifi() { +void ESPMegaIoT::disconnectFromWifi() +{ WiFi.disconnect(); } -bool ESPMegaIoT::wifiConnected() { +bool ESPMegaIoT::wifiConnected() +{ return WiFi.status() == WL_CONNECTED; } -bool ESPMegaIoT::connectToMqtt(char*client_id, char *mqtt_server, uint16_t mqtt_port, char *mqtt_user, char *mqtt_password) { +bool ESPMegaIoT::connectToMqtt(char *client_id, char *mqtt_server, uint16_t mqtt_port, char *mqtt_user, char *mqtt_password) +{ // Store mqtt connection parameters this->mqtt_server = mqtt_server; this->mqtt_port = mqtt_port; this->mqtt_user = mqtt_user; - this->mqtt_password = mqtt_password; - this-> mqtt_useauth = true; + this->mqtt_password = mqtt_password; + this->mqtt_useauth = true; this->client_id = client_id; mqtt.setServer(mqtt_server, mqtt_port); auto boundCallback = std::bind(&ESPMegaIoT::mqttCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); mqtt.setCallback(boundCallback); - if (mqtt.connect(client_id, mqtt_user, mqtt_password)) { + if (mqtt.connect(client_id, mqtt_user, mqtt_password)) + { sessionKeepAlive(); mqttSubscribe(); mqtt_connected = true; @@ -138,7 +171,8 @@ bool ESPMegaIoT::connectToMqtt(char*client_id, char *mqtt_server, uint16_t mqtt_ mqtt_connected = false; return false; } -bool ESPMegaIoT::connectToMqtt(char* client_id, char *mqtt_server, uint16_t mqtt_port) { +bool ESPMegaIoT::connectToMqtt(char *client_id, char *mqtt_server, uint16_t mqtt_port) +{ // Store mqtt connection parameters Serial.println("Storing mqtt connection parameters"); this->mqtt_server = mqtt_server; @@ -151,76 +185,103 @@ bool ESPMegaIoT::connectToMqtt(char* client_id, char *mqtt_server, uint16_t mqtt auto boundCallback = std::bind(&ESPMegaIoT::mqttCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); mqtt.setCallback(boundCallback); Serial.println("Connecting to mqtt"); - if(mqtt.connect(client_id)) { + if (mqtt.connect(client_id)) + { Serial.println("Connected to mqtt"); Serial.println("Calling session keep alive"); sessionKeepAlive(); Serial.println("Subscribing to topics"); mqttSubscribe(); - mqtt_connected = true; + mqtt_connected = true; return true; } Serial.println("Failed to connect to mqtt"); mqtt_connected = false; return false; - - } -void ESPMegaIoT::disconnectFromMqtt() { +void ESPMegaIoT::disconnectFromMqtt() +{ mqtt.disconnect(); } -void ESPMegaIoT::publishToTopic(char *topic, char *payload) { +void ESPMegaIoT::publishToTopic(char *topic, char *payload) +{ mqtt.publish(topic, payload); } -void ESPMegaIoT::registerMqttCallback(void (*callback)(char *, char *)) { +void ESPMegaIoT::registerMqttCallback(void (*callback)(char *, char *)) +{ + user_mqtt_callback = callback; } -void ESPMegaIoT::mqttSubscribe() { +void ESPMegaIoT::mqttSubscribe() +{ + Serial.println("MQTT Subscribe Activated"); + if (user_subscribe_callback != NULL) + { + user_subscribe_callback(); + } // Subscribe to all topics - for (int i = 0; i < 255; i++) { - if (components[i] != NULL) { + for (int i = 0; i < 255; i++) + { + if (components[i] != NULL) + { components[i]->subscribe(); } } } -void ESPMegaIoT::publishRelative(uint8_t card_id, char *topic, char *payload) { +void ESPMegaIoT::publishRelative(uint8_t card_id, char *topic, char *payload) +{ char absolute_topic[100]; sprintf(absolute_topic, "%s/%d/%s", base_topic, card_id, topic); mqtt.publish(absolute_topic, payload); } -bool ESPMegaIoT::mqttReconnect() { - if (mqtt_useauth) { +bool ESPMegaIoT::mqttReconnect() +{ + if (mqtt_useauth) + { return this->connectToMqtt(client_id, mqtt_server, mqtt_port, mqtt_user, mqtt_password); - } else { + } + else + { return this->connectToMqtt(client_id, mqtt_server, mqtt_port); } } -void ESPMegaIoT::sessionKeepAlive() { +void ESPMegaIoT::sessionKeepAlive() +{ static unsigned long lastSessionKeepAlive = 0; - if (millis() - lastSessionKeepAlive > 30000) { + if (millis() - lastSessionKeepAlive > 30000) + { lastSessionKeepAlive = millis(); // Check if mqtt is connected - if (!mqtt.connected()) { + if (!mqtt.connected()) + { // Try to reconnect mqtt_connected = mqttReconnect(); } } } -void ESPMegaIoT::registerRelativeMqttCallback(void (*callback)(char *, char *)) { +void ESPMegaIoT::registerRelativeMqttCallback(void (*callback)(char *, char *)) +{ user_relative_mqtt_callback = callback; } -void ESPMegaIoT::publishRelative(char *topic, char *payload) { +void ESPMegaIoT::publishRelative(char *topic, char *payload) +{ char absolute_topic[100]; sprintf(absolute_topic, "%s/%s", base_topic, topic); mqtt.publish(absolute_topic, payload); } -void ESPMegaIoT::subscribeRelative(char *topic) { +void ESPMegaIoT::subscribeRelative(char *topic) +{ char absolute_topic[100]; sprintf(absolute_topic, "%s/%s", base_topic, topic); mqtt.subscribe(absolute_topic); +} + +void ESPMegaIoT::registerSubscribeCallback(void (*callback)(void)) +{ + user_subscribe_callback = callback; } \ No newline at end of file diff --git a/Template Project/lib/ESPMegaPRO/ESPMegaIoT.hpp b/Template Project/lib/ESPMegaPRO/ESPMegaIoT.hpp index c0e33fd..845b952 100644 --- a/Template Project/lib/ESPMegaPRO/ESPMegaIoT.hpp +++ b/Template Project/lib/ESPMegaPRO/ESPMegaIoT.hpp @@ -34,6 +34,7 @@ public: void publishToTopic(char *topic, char *payload); void registerMqttCallback(void (*callback)(char *, char *)); void registerRelativeMqttCallback(void (*callback)(char *, char *)); + void registerSubscribeCallback(void (*callback)(void)); void setBaseTopic(char *base_topic); IPAddress getETHIp(); @@ -46,6 +47,7 @@ private: void mqttCallback(char *topic, byte *payload, unsigned int length); void (*user_mqtt_callback)(char *, char *); void (*user_relative_mqtt_callback)(char *, char *); + void (*user_subscribe_callback)(void); void publishRelative(uint8_t card_id, char *topic, char *payload); bool active; PubSubClient mqtt; diff --git a/Template Project/lib/ESPMegaPRO/ExpansionCard.hpp b/Template Project/lib/ESPMegaPRO/ExpansionCard.hpp index 9217c42..50ee940 100644 --- a/Template Project/lib/ESPMegaPRO/ExpansionCard.hpp +++ b/Template Project/lib/ESPMegaPRO/ExpansionCard.hpp @@ -5,9 +5,9 @@ class ExpansionCard { public: // Instantiate the card with the specified address ExpansionCard() {} - virtual bool begin() {} + virtual bool begin(); // Preform a loop to refresh the input buffers - virtual void loop() {} + virtual void loop(); // Get the card type - virtual uint8_t getType() {return 255;} + virtual uint8_t getType(); }; \ No newline at end of file diff --git a/Template Project/lib/ESPMegaPRO/IoTComponent.cpp b/Template Project/lib/ESPMegaPRO/IoTComponent.cpp index 3017e9f..b11a434 100644 --- a/Template Project/lib/ESPMegaPRO/IoTComponent.cpp +++ b/Template Project/lib/ESPMegaPRO/IoTComponent.cpp @@ -11,7 +11,8 @@ void IoTComponent::publishRelative(char *topic, char *payload) { } void IoTComponent::subscribeRelative(char *topic) { - char absolute_topic[100]; + Serial.println("Subscribe relative"); + char absolute_topic[50]; sprintf(absolute_topic, "%s/%d/%s", base_topic, card_id, topic); mqtt->subscribe(absolute_topic); } diff --git a/Template Project/src/iotdemo.cpp b/Template Project/src/iotdemo.cpp index 1c562c8..3734239 100644 --- a/Template Project/src/iotdemo.cpp +++ b/Template Project/src/iotdemo.cpp @@ -26,7 +26,6 @@ void setup() Serial.println("Setting static IP"); Serial.println("Connecting to Ethernet"); ETH.begin(); - delay(1000); ETH.config(ip, gateway, subnet); Serial.println("Static IP set"); Serial.println("Begin MQTT Initialization Routine"); @@ -41,6 +40,9 @@ void setup() Serial.println("Registering MQTT Callback"); espmega.iot.registerMqttCallback(mqtt_callback); Serial.println("ESPMega IoT initialized"); + Serial.println("Enable IoT Module for Digital Output Card"); + espmega.iot.registerCard(1); + espmega.iot.publishCard(1); } void loop()