working mqtt implementation

This commit is contained in:
Siwat Sirichai 2023-12-28 15:52:08 +07:00
parent 1163f2e30f
commit bb2991cb2d
6 changed files with 149 additions and 76 deletions

View File

@ -11,6 +11,8 @@ DigitalOutputIoT::~DigitalOutputIoT() {
} }
bool DigitalOutputIoT::begin(uint8_t card_id, ExpansionCard *card, PubSubClient *mqtt, char *base_topic) { bool DigitalOutputIoT::begin(uint8_t card_id, ExpansionCard *card, PubSubClient *mqtt, char *base_topic) {
Serial.print("Initializing digital output IoT component for card ");
Serial.println(card_id);
this->mqtt = mqtt; this->mqtt = mqtt;
this->base_topic = base_topic; this->base_topic = base_topic;
this->card = (DigitalOutputCard *) card; this->card = (DigitalOutputCard *) card;
@ -171,22 +173,27 @@ uint8_t DigitalOutputIoT::getType() {
} }
void DigitalOutputIoT::subscribe() { void DigitalOutputIoT::subscribe() {
Serial.println("Subscribing to digital output topics");
char topic[10];
Serial.println("Subscribe to all set state topics");
// Subscribe to all set state topics // Subscribe to all set state topics
for(int i = 1; i <= 16; i++) { for(int i = 1; i <= 16; i++) {
char topic[10];
sprintf(topic, "%02d/set/state", i); sprintf(topic, "%02d/set/state", i);
subscribeRelative(topic); subscribeRelative(topic);
} }
Serial.println("Subscribe to all set value topics");
// Subscribe to all set value topics // Subscribe to all set value topics
for(int i = 1; i <= 16; i++) { for(int i = 1; i <= 16; i++) {
char topic[10];
sprintf(topic, "%02d/set/value", i); sprintf(topic, "%02d/set/value", i);
subscribeRelative(topic); subscribeRelative(topic);
} }
Serial.println("Subscribe to request state topic");
// Subscribe to request state topic // Subscribe to request state topic
subscribeRelative(REQUEST_STATE_TOPIC); subscribeRelative(REQUEST_STATE_TOPIC);
Serial.println("Subscribe to publish enable topic");
// Subscribe to publish enable topic // Subscribe to publish enable topic
subscribeRelative(PUBLISH_ENABLE_TOPIC); subscribeRelative(PUBLISH_ENABLE_TOPIC);
Serial.println("Subscriptions complete");
} }
void DigitalOutputIoT::loop() { void DigitalOutputIoT::loop() {

View File

@ -1,55 +1,66 @@
#include <ESPMegaIoT.hpp> #include <ESPMegaIoT.hpp>
ESPMegaIoT::ESPMegaIoT() : mqtt(tcpClient) { ESPMegaIoT::ESPMegaIoT() : mqtt(tcpClient)
{
tcpClient.setTimeout(1); tcpClient.setTimeout(1);
// Initialize the components array // Initialize the components array
for (int i = 0; i < 255; i++) { for (int i = 0; i < 255; i++)
{
components[i] = NULL; components[i] = NULL;
} }
active = false; active = false;
mqtt_connected = false; mqtt_connected = false;
} }
void ESPMegaIoT::mqttCallback(char *topic, byte *payload, unsigned int length) { void ESPMegaIoT::mqttCallback(char *topic, byte *payload, unsigned int length)
{
// Create a null terminated string from the payload // Create a null terminated string from the payload
memcpy(payload_buffer, payload, length); memcpy(payload_buffer, payload, length);
payload_buffer[length] = '\0'; payload_buffer[length] = '\0';
// Remove the base topic from the topic // Remove the base topic from the topic
char *topic_without_base = topic + strlen(base_topic) + 1; char *topic_without_base = topic + strlen(base_topic) + 1;
if (user_relative_mqtt_callback != NULL)
{
user_relative_mqtt_callback(topic_without_base + 3, payload_buffer);
}
if (user_mqtt_callback != NULL)
{
user_mqtt_callback(topic, payload_buffer);
}
// Call the respective card's mqtt callback // Call the respective card's mqtt callback
// Note that after the base topic, there should be the card id // Note that after the base topic, there should be the card id
// /base_topic/card_id/... // /base_topic/card_id/...
// First, get the card id in integer form // First, get the card id in integer form
char *card_id_str = strtok(topic_without_base, "/"); char *card_id_str = strtok(topic_without_base, "/");
uint8_t card_id = atoi(card_id_str); uint8_t card_id = atoi(card_id_str);
// Check if the card is registered // Check if the card is registered
if (components[card_id] == NULL) { if (components[card_id] == NULL)
{
return; return;
} }
components[card_id]->handleMqttMessage(topic_without_base + 3, payload_buffer); components[card_id]->handleMqttMessage(topic_without_base + 3, payload_buffer);
if (user_relative_mqtt_callback != NULL) {
user_relative_mqtt_callback(topic_without_base + 3, payload_buffer);
}
if (user_mqtt_callback != NULL) {
user_mqtt_callback(topic, payload_buffer);
}
} }
void ESPMegaIoT::setBaseTopic(char *base_topic) { void ESPMegaIoT::setBaseTopic(char *base_topic)
{
strcpy(this->base_topic, base_topic); strcpy(this->base_topic, base_topic);
base_topic_length = strlen(base_topic); base_topic_length = strlen(base_topic);
} }
void ESPMegaIoT::intr_begin(ExpansionCard *cards[]) { void ESPMegaIoT::intr_begin(ExpansionCard *cards[])
{
this->cards = cards; this->cards = cards;
active = true; active = true;
} }
void ESPMegaIoT::loop() { void ESPMegaIoT::loop()
if(!active) {
if (!active)
return; return;
// Call each component's loop function // Call each component's loop function
for (int i = 0; i < 255; i++) { for (int i = 0; i < 255; i++)
if (components[i] != NULL) { {
if (components[i] != NULL)
{
components[i]->loop(); components[i]->loop();
} }
} }
@ -58,78 +69,100 @@ void ESPMegaIoT::loop() {
} }
// Register Existing Card for use with IoT // Register Existing Card for use with IoT
void ESPMegaIoT::registerCard(uint8_t card_id) { void ESPMegaIoT::registerCard(uint8_t card_id)
{
// Check if the card is already registered // Check if the card is already registered
if (components[card_id] != NULL) { if (components[card_id] != NULL)
{
return; return;
} }
// Get the card type // Get the card type
uint8_t card_type = cards[card_id]->getType(); uint8_t card_type = cards[card_id]->getType();
// Create the respective IoT component // Create the respective IoT component
switch (card_type) { switch (card_type)
case CARD_TYPE_ANALOG: {
components[card_id] = new AnalogIoT(); // case CARD_TYPE_ANALOG:
components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); // components[card_id] = new AnalogIoT();
break; // components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic);
case CARD_TYPE_DIGITAL_INPUT: // break;
components[card_id] = new DigitalInputIoT(); // case CARD_TYPE_DIGITAL_INPUT:
components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); // components[card_id] = new DigitalInputIoT();
break; // components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic);
case CARD_TYPE_DIGITAL_OUTPUT: // break;
components[card_id] = new DigitalOutputIoT(); case CARD_TYPE_DIGITAL_OUTPUT:
components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic); components[card_id] = new DigitalOutputIoT();
break; components[card_id]->begin(card_id, cards[card_id], &mqtt, base_topic);
default: if (mqtt_connected)
return; {
components[card_id]->subscribe();
Serial.pritnln("Back to register card");
}
break;
default:
Serial.println("Invalid card type");
return;
} }
Serial.println("Card registered");
} }
void ESPMegaIoT::deregisterCard(uint8_t card_id) { void ESPMegaIoT::deregisterCard(uint8_t card_id)
{
// Check if the card is registered // Check if the card is registered
if (components[card_id] == NULL) { if (components[card_id] == NULL)
{
return; return;
} }
// Delete the IoT component // Delete the IoT component
delete components[card_id]; delete components[card_id];
components[card_id] = NULL; components[card_id] = NULL;
} }
void ESPMegaIoT::publishCard(uint8_t card_id) { void ESPMegaIoT::publishCard(uint8_t card_id)
{
// Check if the card is registered // Check if the card is registered
if (components[card_id] == NULL) { if (components[card_id] == NULL)
{
return; return;
} }
// Publish the card // Publish the card
components[card_id]->publishReport(); components[card_id]->publishReport();
} }
void ESPMegaIoT::subscribeToTopic(char *topic) { void ESPMegaIoT::subscribeToTopic(char *topic)
{
mqtt.subscribe(topic); mqtt.subscribe(topic);
} }
void ESPMegaIoT::unsubscribeFromTopic(char *topic) { void ESPMegaIoT::unsubscribeFromTopic(char *topic)
{
mqtt.unsubscribe(topic); mqtt.unsubscribe(topic);
} }
void ESPMegaIoT::connectToWifi(char *ssid, char *password) { void ESPMegaIoT::connectToWifi(char *ssid, char *password)
{
WiFi.begin(ssid, password); WiFi.begin(ssid, password);
} }
void ESPMegaIoT::connectToWifi(char *ssid) { void ESPMegaIoT::connectToWifi(char *ssid)
{
WiFi.begin(ssid); WiFi.begin(ssid);
} }
void ESPMegaIoT::disconnectFromWifi() { void ESPMegaIoT::disconnectFromWifi()
{
WiFi.disconnect(); WiFi.disconnect();
} }
bool ESPMegaIoT::wifiConnected() { bool ESPMegaIoT::wifiConnected()
{
return WiFi.status() == WL_CONNECTED; return WiFi.status() == WL_CONNECTED;
} }
bool ESPMegaIoT::connectToMqtt(char*client_id, char *mqtt_server, uint16_t mqtt_port, char *mqtt_user, char *mqtt_password) { bool ESPMegaIoT::connectToMqtt(char *client_id, char *mqtt_server, uint16_t mqtt_port, char *mqtt_user, char *mqtt_password)
{
// Store mqtt connection parameters // Store mqtt connection parameters
this->mqtt_server = mqtt_server; this->mqtt_server = mqtt_server;
this->mqtt_port = mqtt_port; this->mqtt_port = mqtt_port;
this->mqtt_user = mqtt_user; this->mqtt_user = mqtt_user;
this->mqtt_password = mqtt_password; this->mqtt_password = mqtt_password;
this-> mqtt_useauth = true; this->mqtt_useauth = true;
this->client_id = client_id; this->client_id = client_id;
mqtt.setServer(mqtt_server, mqtt_port); mqtt.setServer(mqtt_server, mqtt_port);
auto boundCallback = std::bind(&ESPMegaIoT::mqttCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); auto boundCallback = std::bind(&ESPMegaIoT::mqttCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
mqtt.setCallback(boundCallback); mqtt.setCallback(boundCallback);
if (mqtt.connect(client_id, mqtt_user, mqtt_password)) { if (mqtt.connect(client_id, mqtt_user, mqtt_password))
{
sessionKeepAlive(); sessionKeepAlive();
mqttSubscribe(); mqttSubscribe();
mqtt_connected = true; mqtt_connected = true;
@ -138,7 +171,8 @@ bool ESPMegaIoT::connectToMqtt(char*client_id, char *mqtt_server, uint16_t mqtt_
mqtt_connected = false; mqtt_connected = false;
return false; return false;
} }
bool ESPMegaIoT::connectToMqtt(char* client_id, char *mqtt_server, uint16_t mqtt_port) { bool ESPMegaIoT::connectToMqtt(char *client_id, char *mqtt_server, uint16_t mqtt_port)
{
// Store mqtt connection parameters // Store mqtt connection parameters
Serial.println("Storing mqtt connection parameters"); Serial.println("Storing mqtt connection parameters");
this->mqtt_server = mqtt_server; this->mqtt_server = mqtt_server;
@ -151,76 +185,103 @@ bool ESPMegaIoT::connectToMqtt(char* client_id, char *mqtt_server, uint16_t mqtt
auto boundCallback = std::bind(&ESPMegaIoT::mqttCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); auto boundCallback = std::bind(&ESPMegaIoT::mqttCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
mqtt.setCallback(boundCallback); mqtt.setCallback(boundCallback);
Serial.println("Connecting to mqtt"); Serial.println("Connecting to mqtt");
if(mqtt.connect(client_id)) { if (mqtt.connect(client_id))
{
Serial.println("Connected to mqtt"); Serial.println("Connected to mqtt");
Serial.println("Calling session keep alive"); Serial.println("Calling session keep alive");
sessionKeepAlive(); sessionKeepAlive();
Serial.println("Subscribing to topics"); Serial.println("Subscribing to topics");
mqttSubscribe(); mqttSubscribe();
mqtt_connected = true; mqtt_connected = true;
return true; return true;
} }
Serial.println("Failed to connect to mqtt"); Serial.println("Failed to connect to mqtt");
mqtt_connected = false; mqtt_connected = false;
return false; return false;
} }
void ESPMegaIoT::disconnectFromMqtt() { void ESPMegaIoT::disconnectFromMqtt()
{
mqtt.disconnect(); mqtt.disconnect();
} }
void ESPMegaIoT::publishToTopic(char *topic, char *payload) { void ESPMegaIoT::publishToTopic(char *topic, char *payload)
{
mqtt.publish(topic, payload); mqtt.publish(topic, payload);
} }
void ESPMegaIoT::registerMqttCallback(void (*callback)(char *, char *)) { void ESPMegaIoT::registerMqttCallback(void (*callback)(char *, char *))
{
user_mqtt_callback = callback;
} }
void ESPMegaIoT::mqttSubscribe() { void ESPMegaIoT::mqttSubscribe()
{
Serial.println("MQTT Subscribe Activated");
if (user_subscribe_callback != NULL)
{
user_subscribe_callback();
}
// Subscribe to all topics // Subscribe to all topics
for (int i = 0; i < 255; i++) { for (int i = 0; i < 255; i++)
if (components[i] != NULL) { {
if (components[i] != NULL)
{
components[i]->subscribe(); components[i]->subscribe();
} }
} }
} }
void ESPMegaIoT::publishRelative(uint8_t card_id, char *topic, char *payload) { void ESPMegaIoT::publishRelative(uint8_t card_id, char *topic, char *payload)
{
char absolute_topic[100]; char absolute_topic[100];
sprintf(absolute_topic, "%s/%d/%s", base_topic, card_id, topic); sprintf(absolute_topic, "%s/%d/%s", base_topic, card_id, topic);
mqtt.publish(absolute_topic, payload); mqtt.publish(absolute_topic, payload);
} }
bool ESPMegaIoT::mqttReconnect() { bool ESPMegaIoT::mqttReconnect()
if (mqtt_useauth) { {
if (mqtt_useauth)
{
return this->connectToMqtt(client_id, mqtt_server, mqtt_port, mqtt_user, mqtt_password); return this->connectToMqtt(client_id, mqtt_server, mqtt_port, mqtt_user, mqtt_password);
} else { }
else
{
return this->connectToMqtt(client_id, mqtt_server, mqtt_port); return this->connectToMqtt(client_id, mqtt_server, mqtt_port);
} }
} }
void ESPMegaIoT::sessionKeepAlive() { void ESPMegaIoT::sessionKeepAlive()
{
static unsigned long lastSessionKeepAlive = 0; static unsigned long lastSessionKeepAlive = 0;
if (millis() - lastSessionKeepAlive > 30000) { if (millis() - lastSessionKeepAlive > 30000)
{
lastSessionKeepAlive = millis(); lastSessionKeepAlive = millis();
// Check if mqtt is connected // Check if mqtt is connected
if (!mqtt.connected()) { if (!mqtt.connected())
{
// Try to reconnect // Try to reconnect
mqtt_connected = mqttReconnect(); mqtt_connected = mqttReconnect();
} }
} }
} }
void ESPMegaIoT::registerRelativeMqttCallback(void (*callback)(char *, char *)) { void ESPMegaIoT::registerRelativeMqttCallback(void (*callback)(char *, char *))
{
user_relative_mqtt_callback = callback; user_relative_mqtt_callback = callback;
} }
void ESPMegaIoT::publishRelative(char *topic, char *payload) { void ESPMegaIoT::publishRelative(char *topic, char *payload)
{
char absolute_topic[100]; char absolute_topic[100];
sprintf(absolute_topic, "%s/%s", base_topic, topic); sprintf(absolute_topic, "%s/%s", base_topic, topic);
mqtt.publish(absolute_topic, payload); mqtt.publish(absolute_topic, payload);
} }
void ESPMegaIoT::subscribeRelative(char *topic) { void ESPMegaIoT::subscribeRelative(char *topic)
{
char absolute_topic[100]; char absolute_topic[100];
sprintf(absolute_topic, "%s/%s", base_topic, topic); sprintf(absolute_topic, "%s/%s", base_topic, topic);
mqtt.subscribe(absolute_topic); mqtt.subscribe(absolute_topic);
}
void ESPMegaIoT::registerSubscribeCallback(void (*callback)(void))
{
user_subscribe_callback = callback;
} }

View File

@ -34,6 +34,7 @@ public:
void publishToTopic(char *topic, char *payload); void publishToTopic(char *topic, char *payload);
void registerMqttCallback(void (*callback)(char *, char *)); void registerMqttCallback(void (*callback)(char *, char *));
void registerRelativeMqttCallback(void (*callback)(char *, char *)); void registerRelativeMqttCallback(void (*callback)(char *, char *));
void registerSubscribeCallback(void (*callback)(void));
void setBaseTopic(char *base_topic); void setBaseTopic(char *base_topic);
IPAddress getETHIp(); IPAddress getETHIp();
@ -46,6 +47,7 @@ private:
void mqttCallback(char *topic, byte *payload, unsigned int length); void mqttCallback(char *topic, byte *payload, unsigned int length);
void (*user_mqtt_callback)(char *, char *); void (*user_mqtt_callback)(char *, char *);
void (*user_relative_mqtt_callback)(char *, char *); void (*user_relative_mqtt_callback)(char *, char *);
void (*user_subscribe_callback)(void);
void publishRelative(uint8_t card_id, char *topic, char *payload); void publishRelative(uint8_t card_id, char *topic, char *payload);
bool active; bool active;
PubSubClient mqtt; PubSubClient mqtt;

View File

@ -5,9 +5,9 @@ class ExpansionCard {
public: public:
// Instantiate the card with the specified address // Instantiate the card with the specified address
ExpansionCard() {} ExpansionCard() {}
virtual bool begin() {} virtual bool begin();
// Preform a loop to refresh the input buffers // Preform a loop to refresh the input buffers
virtual void loop() {} virtual void loop();
// Get the card type // Get the card type
virtual uint8_t getType() {return 255;} virtual uint8_t getType();
}; };

View File

@ -11,7 +11,8 @@ void IoTComponent::publishRelative(char *topic, char *payload) {
} }
void IoTComponent::subscribeRelative(char *topic) { void IoTComponent::subscribeRelative(char *topic) {
char absolute_topic[100]; Serial.println("Subscribe relative");
char absolute_topic[50];
sprintf(absolute_topic, "%s/%d/%s", base_topic, card_id, topic); sprintf(absolute_topic, "%s/%d/%s", base_topic, card_id, topic);
mqtt->subscribe(absolute_topic); mqtt->subscribe(absolute_topic);
} }

View File

@ -26,7 +26,6 @@ void setup()
Serial.println("Setting static IP"); Serial.println("Setting static IP");
Serial.println("Connecting to Ethernet"); Serial.println("Connecting to Ethernet");
ETH.begin(); ETH.begin();
delay(1000);
ETH.config(ip, gateway, subnet); ETH.config(ip, gateway, subnet);
Serial.println("Static IP set"); Serial.println("Static IP set");
Serial.println("Begin MQTT Initialization Routine"); Serial.println("Begin MQTT Initialization Routine");
@ -41,6 +40,9 @@ void setup()
Serial.println("Registering MQTT Callback"); Serial.println("Registering MQTT Callback");
espmega.iot.registerMqttCallback(mqtt_callback); espmega.iot.registerMqttCallback(mqtt_callback);
Serial.println("ESPMega IoT initialized"); Serial.println("ESPMega IoT initialized");
Serial.println("Enable IoT Module for Digital Output Card");
espmega.iot.registerCard(1);
espmega.iot.publishCard(1);
} }
void loop() void loop()