update Code

This commit is contained in:
Siwat Sirichai 2019-08-09 09:17:05 +07:00
parent 3c47103b39
commit a641fad56c
119 changed files with 10997 additions and 5 deletions

View file

@ -0,0 +1,409 @@
/*
* Copyright (C) 2018 Andreas Motzek andreas-motzek@t-online.de
*
* This file is part of the MQTT Client package.
*
* You can use, redistribute and/or modify this file under the terms of the Modified Artistic License.
* See http://simplysomethings.de/open+source/modified+artistic+license.html for details.
*
* This file is distributed in the hope that it will be useful, but without any warranty; without even
* the implied warranty of merchantability or fitness for a particular purpose.
*/
#include "MQTTClient.h"
MQTTClient* MQTTClient::current = NULL;
MQTTClient::MQTTClient(CooperativeMultitasking* _tasks, Client* _client, const char* _host, uint16_t _port, const char* _clientid, const char* _username, const char* _password, uint16_t _keepalive) {
tasks = _tasks;
client = _client;
host = strdup(_host);
port = _port;
clientid = strdup(_clientid);
username = strdupOrNull(_username);
password = strdupOrNull(_password);
keepalive = _keepalive;
isconnected = false;
head = NULL;
tail = NULL;
}
MQTTClient::~MQTTClient() {
free(host);
free(clientid);
free(username);
free(password);
host = NULL;
clientid = NULL;
username = NULL;
password = NULL;
//
while (head) {
PublishPacket* next = head->next;
free(head->payload);
delete head;
head = next;
}
//
tail = NULL;
//
if (current == this) current = NULL;
}
bool MQTTClient::connect() {
if (!current) {
if (client->connect(host, port)) {
if (sendConnectPacket()) {
current = this;
auto task1 = tasks->ifThen([] () -> bool { return current ? current->available() >= 4 : true; },
[] () -> void { if (current) current->receiveConnectAcknowledgementPacket(); });
auto task2 = tasks->after(10000, [] () -> void { if (current) current->stop(); });
tasks->onlyOneOf(task1, task2);
//
return true;
}
//
Serial.println("cannot send connect packet");
//
stop();
} else {
Serial.print("cannot connect to ");
Serial.print(host);
Serial.print(":");
Serial.println(port);
}
} else {
Serial.println("another mqtt client is connected");
}
//
return false;
}
bool MQTTClient::connected() {
return isconnected;
}
bool MQTTClient::publishAcknowledged() {
return !head;
}
bool MQTTClient::publish(bool retain, const char* topicname, const char* payload) {
PublishPacket* packet = new PublishPacket(); // std::nothrow is default
//
if (packet) {
packet->retain = retain;
packet->topicname = topicname;
packet->payload = strdup(payload);
enqueuePublishPacket(packet);
transmitPublishPacketsAfter(0);
//
return true;
}
//
Serial.println("cannot enqueue publish packet");
//
return false;
}
void MQTTClient::disconnect() {
if (isconnected) sendDisconnectPacket();
//
stop();
}
void MQTTClient::enqueuePublishPacket(PublishPacket* packet) {
uint16_t packetid = 1; // 2.3.1 non-zero 16-bit packetid
//
if (head) {
PublishPacket* packet = head;
//
while (packet) {
if (packet->packetid > packetid) packetid = packet->packetid;
//
packet = packet->next;
}
//
packetid++; // biggest packetid plus 1
}
//
packet->packetid = packetid;
packet->trycount = 0;
packet->next = head;
head = packet;
//
if (!tail) tail = head;
}
void MQTTClient::transmitPublishPacketsAfter(unsigned long duration) {
tasks->after(duration, [] () -> void { if (current) current->transmitPublishPackets(); });
}
void MQTTClient::transmitPublishPackets() {
if (isconnected && current == this && head) {
if (available() >= 4) {
receivePublishAcknowledgementPacket();
transmitPublishPacketsAfter(100);
//
return;
}
//
if (head->trycount >= 20) {
Serial.println("discarding packet");
//
removePublishPacket(head->packetid);
transmitPublishPacketsAfter(100);
//
return;
}
//
if (sendHeadPublishPacket()) {
head->trycount++;
rotatePublishPackets();
transmitPublishPacketsAfter(10000);
//
return;
}
//
Serial.println("cannot send publish packet");
//
stop();
}
}
void MQTTClient::removePublishPacket(uint16_t packetid) {
PublishPacket* last = NULL;
PublishPacket* packet = head;
//
while (packet) {
if (packet->packetid == packetid) {
if (last) last->next = packet->next;
//
if (packet == head) head = packet->next;
//
if (packet == tail) tail = last;
//
free(packet->payload);
delete packet;
//
return;
}
//
last = packet;
packet = packet->next;
}
}
void MQTTClient::rotatePublishPackets() {
if (!head || head == tail) return; // less than 2 packets
//
tail->next = head;
tail = head;
head = head->next;
tail->next = NULL;
}
bool MQTTClient::sendConnectPacket() {
int packetlength = 2 + 4 + 1 + 1 + 2 + 2 + strlen(clientid);
//
if (username != NULL) packetlength += (2 + strlen(username));
//
if (password != NULL) packetlength += (2 + strlen(password));
//
uint8_t connectflags = 2; // clean session
//
if (username != NULL) connectflags |= 128;
//
if (password != NULL) connectflags |= 64;
//
// Type, Flags, Packet Length
writeTypeFlags(1, 0); // connect, 0
writePacketLength(packetlength);
//
// Header
writeLengthString("MQTT"); // protocol name
writeByte(4); // protocol level
writeByte(connectflags);
writeShort(keepalive);
//
// Payload
writeLengthString(clientid);
//
if (username != NULL) writeLengthString(username);
//
if (password != NULL) writeLengthString(password);
//
flush();
//
return !getWriteError();
}
void MQTTClient::receiveConnectAcknowledgementPacket() {
uint8_t typeflags = readByte();
uint8_t packetlength = readByte();
uint8_t sessionpresent = readByte();
uint8_t returncode = readByte();
//
if (typeflags == (2 << 4) && packetlength == 2) {
switch (returncode) {
case 0: Serial.println("connection accepted"); isconnected = true; transmitPublishPacketsAfter(0); return;
case 1: Serial.println("unacceptable protocol version"); break;
case 2: Serial.println("identifier rejected"); break;
case 3: Serial.println("server unavailable"); break;
case 4: Serial.println("bad user name or password"); break;
case 5: Serial.println("not authorized"); break;
default: Serial.println(returncode); break;
}
} else {
Serial.println("not a connect acknowledgement");
}
//
stop();
}
bool MQTTClient::sendHeadPublishPacket() {
int packetlength = 2 + strlen(head->topicname) + 2 + strlen(head->payload);
uint8_t flags = 2; // QoS 1
//
if (head->trycount > 0) flags |= 8; // duplicate
//
if (head->retain) flags |= 1;
//
// Type, Flags, Packet Length
writeTypeFlags(3, flags); // publish, flags
writePacketLength(packetlength);
//
// Header
writeLengthString(head->topicname);
writeShort(head->packetid);
//
// Payload
writeString(head->payload, strlen(head->payload));
//
flush();
//
return !getWriteError();
}
void MQTTClient::receivePublishAcknowledgementPacket() {
uint8_t typeflags = readByte();
uint8_t packetlength = readByte();
uint16_t packetid = readShort();
//
if (typeflags == (4 << 4) && packetlength == 2) {
removePublishPacket(packetid);
//
Serial.println("publish acknowledged");
//
return;
}
//
Serial.println("not a publish acknowledgement");
disconnect();
}
void MQTTClient::sendDisconnectPacket() {
// Type, Flags, Packet Length
writeTypeFlags(14, 0); // disconnect, 0
writePacketLength(0);
//
flush();
}
void MQTTClient::writeTypeFlags(uint8_t type, uint8_t flags) {
writeByte(type << 4 | flags);
}
void MQTTClient::writePacketLength(int value) {
while (true) {
int digit = value & 127;
value >>= 7;
//
if (value > 0) {
writeByte(digit | 128);
} else {
writeByte(digit);
//
break;
}
}
}
void MQTTClient::writeLengthString(const char* value) {
size_t len = strlen(value);
//
if (len > 65535) return;
//
writeShort(len);
writeString(value, len);
}
void MQTTClient::writeString(const char* value, size_t len) {
client->write((uint8_t*) value, len);
}
void MQTTClient::writeShort(uint16_t value) {
writeByte(value >> 8);
writeByte(value & 255);
}
void MQTTClient::writeByte(uint8_t value) {
client->write(value);
}
void MQTTClient::flush() {
client->flush();
}
int MQTTClient::getWriteError() {
return client->getWriteError();
}
int MQTTClient::available() {
return client->available();
}
uint8_t MQTTClient::readByte() {
return client->read();
}
uint16_t MQTTClient::readShort() {
uint16_t value = client->read();
value <<= 8;
value += client->read();
//
return value;
}
void MQTTClient::stop() {
if (client->connected()) {
client->stop();
client->clearWriteError();
}
//
isconnected = false;
current = NULL;
}
char* MQTTClient::strdupOrNull(const char* string) {
if (string == NULL) return NULL;
//
return strdup(string);
}
/*
*
*/
MQTTTopic::MQTTTopic(MQTTClient* _client, const char* _topicname) {
client = _client;
topicname = strdup(_topicname);
}
MQTTTopic::~MQTTTopic() {
free(topicname);
topicname = NULL;
}
bool MQTTTopic::publish(const char* payload, bool retain) {
return client->publish(retain, topicname, payload);
}

View file

@ -0,0 +1,98 @@
/*
* Copyright (C) 2018 Andreas Motzek andreas-motzek@t-online.de
*
* This file is part of the MQTT Client package.
*
* You can use, redistribute and/or modify this file under the terms of the Modified Artistic License.
* See http://simplysomethings.de/open+source/modified+artistic+license.html for details.
*
* This file is distributed in the hope that it will be useful, but without any warranty; without even
* the implied warranty of merchantability or fitness for a particular purpose.
*/
#ifndef MQTTClient_h
#define MQTTClient_h
#include "Client.h"
#include "CooperativeMultitasking.h"
class MQTTClient {
friend class MQTTTopic;
private:
struct PublishPacket {
bool retain;
const char* topicname;
char* payload;
uint16_t packetid;
uint16_t trycount;
PublishPacket* next;
};
static MQTTClient* current;
CooperativeMultitasking* tasks;
Client* client;
char* host;
uint16_t port;
char* clientid;
char* username;
char* password;
uint16_t keepalive;
bool isconnected;
PublishPacket* head;
PublishPacket* tail;
void enqueuePublishPacket(PublishPacket* packet);
void transmitPublishPacketsAfter(unsigned long duration);
void transmitPublishPackets();
void removePublishPacket(uint16_t packetid);
void rotatePublishPackets();
bool sendConnectPacket();
void receiveConnectAcknowledgementPacket();
bool sendHeadPublishPacket();
void receivePublishAcknowledgementPacket();
void sendDisconnectPacket();
void writeTypeFlags(uint8_t type, uint8_t flags);
void writePacketLength(int value);
void writeLengthString(const char* value);
void writeString(const char* value, size_t len);
void writeShort(uint16_t value);
void writeByte(uint8_t value);
uint8_t readByte();
uint16_t readShort();
void flush();
int getWriteError();
int available();
void stop();
static char* strdupOrNull(const char* string);
protected:
bool publish(bool retain, const char* topicname, const char* payload);
public:
MQTTClient(CooperativeMultitasking* tasks, Client* client, const char* host, uint16_t port, const char* clientid, const char* username, const char* password, uint16_t keepalive = 300);
virtual ~MQTTClient();
bool connect();
bool connected();
bool publishAcknowledged();
void disconnect();
};
class MQTTTopic {
private:
MQTTClient* client;
char* topicname;
bool retain;
public:
MQTTTopic(MQTTClient* client, const char* topicname);
virtual ~MQTTTopic();
bool publish(const char* payload, bool retain = true);
};
#endif

View file

@ -0,0 +1,91 @@
## MQTT Client
MQTT Client is a library that can publish strings to a topic of a MQTT broker.
For using it you have to include the library
[Cooperative Multitasking](https://bitbucket.org/amotzek/cooperative-multitasking) too.
#### Example
The following example publishes the string `"Hello"` to the topic `"amotzek/hello"`
of the broker broker.hivemq.com repeatedly. The message can be observed in HiveMQs
[Websockets client](http://www.hivemq.com/demos/websocket-client/).
The example assumes that you connect to the internet via `Wifi101`, so you need the
SSID and the password of your WLAN. You also need an Arduino board that is compatible
with `Wifi101`, e.g. the MKR1000. If your board connects to the internet in a
different way, you have to pass a different client (for example an `EthernetClient`)
to the constructor of `MQTTClient`.
#include <Client.h>
#include <WiFi101.h>
#include <CooperativeMultitasking.h>
#include <MQTTClient.h>
char ssid[] = "...";
char pass[] = "...";
char host[] = "broker.hivemq.com";
char clientid[] = "...";
char username[] = "...";
char password[] = "...";
char topicname[] = "amotzek/hello";
CooperativeMultitasking tasks;
WiFiClient wificlient;
MQTTClient mqttclient(&tasks, &wificlient, host, 1883, clientid, username, password);
MQTTTopic topic(&mqttclient, topicname);
void setup() {
Serial.begin(9600);
//
while (!Serial) {
delay(1000);
}
//
WiFi.begin(ssid, pass);
delay(10000); // wait until WiFi connection is established
}
void loop() {
if (mqttclient.connect()) {
topic.publish("Hello");
//
while (tasks.available()) {
tasks.run(); // receive connect acknowledgement, send publish, receive publish acknowledgement
}
//
mqttclient.disconnect();
}
//
switch (WiFi.status()) {
case WL_CONNECT_FAILED:
case WL_CONNECTION_LOST:
case WL_DISCONNECTED: WiFi.begin(ssid, pass); // reconnect WiFi if necessary
}
//
delay(30000);
}
#### Details
To create an instance of class `MQTTClient` you need a reference to an instance of class
`CooperativeMultitasking`, a reference to an instance of a network connection (e.g. from
`WiFi101` or `Ethernet`), the host name of the MQTT broker, its port number (typically
1883) and a client id. If the broker requires that, you also have to pass a user name and
a password.
To create an instance of class `MQTTTopic` you have to pass a reference to an instance of
`MQTTClient` and the topic name.
The method `connect()` of `MQTTClient` creates a network connection to the broker.
You can publish a string by calling the method `publish("...")` of `MQTTTopic`. The
method can be called independently of an established connection to the MQTT broker. It
will delay publishing your message until a connection becomes available.
The message is published with QoS 1. That means that `MQTTClient` waits for a publish
acknowledgement from the MQTT broker. If the acknowledgement does not arrive in time,
`MQTTClient` retries publishing the message up to 15 times until it gives up.
`connect` and `publish` work asynchronously. They create tasks that are managed by the
instance of `CooperativeMultitasking`. That's why you have to call it's method `run()`
until all tasks are finished.

View file

@ -0,0 +1,59 @@
/*
* Publishes "Hello" at regular intervals. The message can be subscribed and observed in HiveMQs
* Websockets client http://www.hivemq.com/demos/websocket-client/
*
* Copyright (C) 2018 Andreas Motzek andreas-motzek@t-online.de
*
* You can use, redistribute and/or modify this file under the terms of the Modified Artistic License.
* See http://simplysomethings.de/open+source/modified+artistic+license.html for details.
*
* This file is distributed in the hope that it will be useful, but without any warranty; without even
* the implied warranty of merchantability or fitness for a particular purpose.
*/
#include <Client.h>
#include <WiFi101.h>
#include <CooperativeMultitasking.h>
#include <MQTTClient.h>
char ssid[] = "...";
char pass[] = "...";
char host[] = "broker.hivemq.com";
char clientid[] = " ";
char topicname[] = "amotzek/hello";
CooperativeMultitasking tasks;
WiFiClient wificlient;
MQTTClient mqttclient(&tasks, &wificlient, host, 1883, clientid, NULL, NULL);
MQTTTopic topic(&mqttclient, topicname);
void setup() {
Serial.begin(9600);
//
while (!Serial) {
delay(1000);
}
//
WiFi.begin(ssid, pass);
delay(10000);
}
void loop() {
if (mqttclient.connect()) {
topic.publish("Hello");
//
while (tasks.available()) {
tasks.run();
}
//
mqttclient.disconnect();
}
//
switch (WiFi.status()) {
case WL_CONNECT_FAILED:
case WL_CONNECTION_LOST:
case WL_DISCONNECTED: WiFi.begin(ssid, pass);
}
//
delay(30000);
}

View file

@ -0,0 +1,79 @@
/*
* Publishes "Hello" at regular intervals. The message can be subscribed and observed in HiveMQs
* Websockets client http://www.hivemq.com/demos/websocket-client/
*
* If the libraries Arduino Low Power and RTC Zero are included then Cooperative Multitasking
* uses LowPower.idle(...) instead of delay(...).
*
* Copyright (C) 2018 Andreas Motzek andreas-motzek@t-online.de
*
* You can use, redistribute and/or modify this file under the terms of the Modified Artistic License.
* See http://simplysomethings.de/open+source/modified+artistic+license.html for details.
*
* This file is distributed in the hope that it will be useful, but without any warranty; without even
* the implied warranty of merchantability or fitness for a particular purpose.
*/
#include <Client.h>
#include <WiFi101.h>
#include <ArduinoLowPower.h>
#include <CooperativeMultitasking.h>
#include <MQTTClient.h>
char ssid[] = "...";
char pass[] = "...";
char host[] = "broker.hivemq.com";
char clientid[] = " ";
char topicname[] = "amotzek/hello";
CooperativeMultitasking tasks;
WiFiClient wificlient;
MQTTClient mqttclient(&tasks, &wificlient, host, 1883, clientid, NULL, NULL);
MQTTTopic topic(&mqttclient, topicname);
void setup() {
Serial.begin(9600);
//
while (!Serial) {
delay(1000);
}
//
WiFi.begin(ssid, pass);
tasks.after(10000, checkWiFi); // after 10 seconds call checkWiFi()
tasks.after(10000, checkBroker); // after 10 seconds call checkBroker()
tasks.after(30000, publishHello); // after 30 seconds call publishHello()
}
void loop() {
tasks.run();
}
void checkWiFi() {
switch (WiFi.status()) {
case WL_CONNECT_FAILED:
case WL_CONNECTION_LOST:
case WL_DISCONNECTED:
Serial.println("wifi not connected");
WiFi.begin(ssid, pass);
tasks.after(10000, checkWiFi); // after 10 seconds call checkWiFi()
//
return;
}
//
tasks.after(30000, checkWiFi); // after 30 seconds call checkWiFi()
}
void checkBroker() {
if (!mqttclient.connected()) {
Serial.println("mqtt client not connected");
mqttclient.connect();
}
//
tasks.after(30000, checkBroker); // after 30 seconds call checkBroker()
}
void publishHello() {
if (mqttclient.connected()) topic.publish("Hello");
//
tasks.after(30000, publishHello); // after 30 seconds call publishHello()
}

View file

@ -0,0 +1,123 @@
/*
* Make the builtin LED blink, but only when it's dark. Publish light or dark condition to a
* MQTT topic.
*
* Connect an analog light sensor with the Arduino: GND to GND,
* VCC to VCC, SIG to A1.
*
* Copyright (C) 2018 Andreas Motzek andreas-motzek@t-online.de
*
* You can use, redistribute and/or modify this file under the terms of the Modified Artistic License.
* See http://simplysomethings.de/open+source/modified+artistic+license.html for details.
*
* This file is distributed in the hope that it will be useful, but without any warranty; without even
* the implied warranty of merchantability or fitness for a particular purpose.
*/
#include <Client.h>
#include <WiFi101.h>
#include <CooperativeMultitasking.h>
#include <MQTTClient.h>
char ssid[] = "... WLAN SSID ...";
char pass[] = "... WLAN password ...";
char host[] = "... MQTT broker host name ...";
char clientid[] = "... MQTT client id ...";
char username[] = "... MQTT user name ...";
char password[] = "... MQTT password ...";
char topicname[] = "... MQTT topic name ....";
CooperativeMultitasking tasks;
Continuation beginWiFiIfNeeded;
Continuation connectMQTTClientIfNeeded;
Continuation light;
Continuation dark;
Guard isDark;
Continuation on;
Continuation off;
Guard isLight;
WiFiClient wificlient;
MQTTClient mqttclient(&tasks, &wificlient, host, 1883, clientid, username, password);
MQTTTopic topic(&mqttclient, topicname);
void setup() {
Serial.begin(9600);
//
while (!Serial) {
delay(1000);
}
//
Serial.println("begin wifi");
//
WiFi.begin(ssid, pass);
tasks.after(10000, beginWiFiIfNeeded); // after 10 seconds call beginWiFiIfNeeded()
//
Serial.println("connect mqtt client");
//
mqttclient.connect();
tasks.after(15000, connectMQTTClientIfNeeded); // after 15 seconds call connectMQTTClientIfNeeded()
//
pinMode(LED_BUILTIN, OUTPUT);
tasks.now(light); // call light() now
}
void loop() {
tasks.run();
}
void light() {
topic.publish("\"light\"");
//
tasks.ifForThen(isDark, 5000, dark); // if isDark() for 5 seconds then call dark()
}
void dark() {
topic.publish("\"dark\"");
//
tasks.now(on); // call on() now
}
bool isDark() {
return analogRead(A1) < 300;
}
void on() {
digitalWrite(LED_BUILTIN, HIGH);
tasks.after(1000, off); // after 1 second call off()
}
void off() {
digitalWrite(LED_BUILTIN, LOW);
auto task1 = tasks.after(1000, on); // after 1 second call on()
auto task2 = tasks.ifForThen(isLight, 0, light); // if isLight() then call light()
tasks.onlyOneOf(task1, task2); // do either task1 or task2
}
bool isLight() {
return analogRead(A1) >= 400;
}
void beginWiFiIfNeeded() {
switch (WiFi.status()) {
case WL_IDLE_STATUS:
case WL_CONNECTED: tasks.after(30000, beginWiFiIfNeeded); return; // after 30 seconds call beginWiFiIfNeeded() again
case WL_NO_SHIELD: Serial.println("no wifi shield"); return; // do not check again
case WL_CONNECT_FAILED: Serial.println("wifi connect failed"); break;
case WL_CONNECTION_LOST: Serial.println("wifi connection lost"); break;
case WL_DISCONNECTED: Serial.println("wifi disconnected"); break;
}
//
WiFi.begin(ssid, pass);
tasks.after(10000, beginWiFiIfNeeded); // after 10 seconds call beginWiFiIfNeeded() again
}
void connectMQTTClientIfNeeded() {
if (!mqttclient.connected()) {
Serial.println("mqtt client not connected");
//
mqttclient.connect();
}
//
tasks.after(30000, connectMQTTClientIfNeeded); // after 30 seconds call connectMQTTClientIfNeeded() again
}

View file

@ -0,0 +1,7 @@
MQTTClient KEYWORD1
MQTTTopic KEYWORD1
connect KEYWORD2
connected KEYWORD2
publishAcknowledged KEYWORD2
disconnect KEYWORD2
publish KEYWORD2

View file

@ -0,0 +1,9 @@
name=MQTT Client
version=1.0.1
author=Andreas Motzek <andreas-motzek@t-online.de>
maintainer=Andreas Motzek <andreas-motzek@t-online.de>
sentence=MQTT Client lets you connect to a MQTT broker and publish strings to a topic.
paragraph=First create a MQTTClient. You have to pass a task list from Cooperative Multitasking, a client (e.g. WiFiClient from WiFi101), host, port and credentials of the MQTT broker. Then create a MQTTTopic. Use the MQTTClient and the topic name as arguments when doing that. To connect to the broker call MQTTClient::connect(). After that you can call MQTTTopic:publish("...") to publish a string. Requires Cooperative Multitasking for processing the acknowledgement packets of the broker. So you have call CooperativeMultitasking::run() in your loop() function. See the examples for details.
category=Communication
url=https://bitbucket.org/amotzek/arduino/src/fab21e1e7785fe9473d83107048d4431c8fd25a9/src/main/cpp/MQTTClient/?at=master
architectures=samd