From aa315f8472f40b4e4a63c8222626eda92b75da24 Mon Sep 17 00:00:00 2001 From: cschwinne Date: Sun, 17 Feb 2019 19:21:09 +0100 Subject: [PATCH] Switched from PubSubClient to AsyncMqttClient --- .../async-mqtt-client/AsyncMqttClient.cpp | 856 ++++++++++++++++++ .../async-mqtt-client/AsyncMqttClient.h | 6 + .../async-mqtt-client/AsyncMqttClient.hpp | 153 ++++ .../AsyncMqttClient/Callbacks.hpp | 28 + .../AsyncMqttClient/DisconnectReasons.hpp | 15 + .../AsyncMqttClient/Flags.hpp | 57 ++ .../AsyncMqttClient/Helpers.hpp | 38 + .../AsyncMqttClient/MessageProperties.hpp | 7 + .../AsyncMqttClient/Packets/ConnAckPacket.cpp | 30 + .../AsyncMqttClient/Packets/ConnAckPacket.hpp | 25 + .../AsyncMqttClient/Packets/Packet.hpp | 11 + .../Packets/PingRespPacket.cpp | 21 + .../Packets/PingRespPacket.hpp | 21 + .../AsyncMqttClient/Packets/PubAckPacket.cpp | 30 + .../AsyncMqttClient/Packets/PubAckPacket.hpp | 25 + .../AsyncMqttClient/Packets/PubCompPacket.cpp | 30 + .../AsyncMqttClient/Packets/PubCompPacket.hpp | 25 + .../AsyncMqttClient/Packets/PubRecPacket.cpp | 30 + .../AsyncMqttClient/Packets/PubRecPacket.hpp | 25 + .../AsyncMqttClient/Packets/PubRelPacket.cpp | 30 + .../AsyncMqttClient/Packets/PubRelPacket.hpp | 25 + .../AsyncMqttClient/Packets/PublishPacket.cpp | 91 ++ .../AsyncMqttClient/Packets/PublishPacket.hpp | 38 + .../AsyncMqttClient/Packets/SubAckPacket.cpp | 46 + .../AsyncMqttClient/Packets/SubAckPacket.hpp | 25 + .../Packets/UnsubAckPacket.cpp | 30 + .../Packets/UnsubAckPacket.hpp | 25 + .../AsyncMqttClient/ParsingInformation.hpp | 21 + .../AsyncMqttClient/Storage.hpp | 13 + .../dependencies/async-mqtt-client/LICENSE | 21 + .../dependencies/async-mqtt-client/README.md | 18 + .../src/dependencies/pubsubclient/LICENSE.txt | 20 - .../pubsubclient/PubSubClient.cpp | 601 ------------ .../dependencies/pubsubclient/PubSubClient.h | 144 --- wled00/wled00.ino | 14 +- wled00/wled05_init.ino | 5 +- wled00/wled08_led.ino | 2 +- wled00/wled17_mqtt.ino | 117 +-- wled00/wled18_server.ino | 2 +- 39 files changed, 1868 insertions(+), 853 deletions(-) create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.h create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Callbacks.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/DisconnectReasons.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Flags.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Helpers.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/MessageProperties.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/Packet.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.cpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/ParsingInformation.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Storage.hpp create mode 100644 wled00/src/dependencies/async-mqtt-client/LICENSE create mode 100644 wled00/src/dependencies/async-mqtt-client/README.md delete mode 100644 wled00/src/dependencies/pubsubclient/LICENSE.txt delete mode 100644 wled00/src/dependencies/pubsubclient/PubSubClient.cpp delete mode 100644 wled00/src/dependencies/pubsubclient/PubSubClient.h diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp new file mode 100644 index 00000000..b3eff719 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp @@ -0,0 +1,856 @@ +#include "AsyncMqttClient.hpp" + +AsyncMqttClient::AsyncMqttClient() +: _connected(false) +, _connectPacketNotEnoughSpace(false) +, _disconnectFlagged(false) +, _tlsBadFingerprint(false) +, _lastClientActivity(0) +, _lastServerActivity(0) +, _lastPingRequestTime(0) +, _host(nullptr) +, _useIp(false) +#if ASYNC_TCP_SSL_ENABLED +, _secure(false) +#endif +, _port(0) +, _keepAlive(15) +, _cleanSession(true) +, _clientId(nullptr) +, _username(nullptr) +, _password(nullptr) +, _willTopic(nullptr) +, _willPayload(nullptr) +, _willPayloadLength(0) +, _willQos(0) +, _willRetain(false) +, _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE } +, _currentParsedPacket(nullptr) +, _remainingLengthBufferPosition(0) +, _nextPacketId(1) { + _client.onConnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onConnect(c); }, this); + _client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onDisconnect(c); }, this); + _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast(obj))->_onError(c, error); }, this); + _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast(obj))->_onTimeout(c, time); }, this); + _client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast(obj))->_onAck(c, len, time); }, this); + _client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast(obj))->_onData(c, static_cast(data), len); }, this); + _client.onPoll([](void* obj, AsyncClient* c) { (static_cast(obj))->_onPoll(c); }, this); + +#ifdef ESP32 + sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac()); +#elif defined(ESP8266) + sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId()); +#endif + _clientId = _generatedClientId; + + setMaxTopicLength(128); +} + +AsyncMqttClient::~AsyncMqttClient() { + delete _currentParsedPacket; + delete[] _parsingInformation.topicBuffer; +} + +AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { + _keepAlive = keepAlive; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) { + _clientId = clientId; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) { + _cleanSession = cleanSession; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) { + _parsingInformation.maxTopicLength = maxTopicLength; + delete[] _parsingInformation.topicBuffer; + _parsingInformation.topicBuffer = new char[maxTopicLength + 1]; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) { + _username = username; + _password = password; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) { + _willTopic = topic; + _willQos = qos; + _willRetain = retain; + _willPayload = payload; + _willPayloadLength = length; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) { + _useIp = true; + _ip = ip; + _port = port; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) { + _useIp = false; + _host = host; + _port = port; + return *this; +} + +#if ASYNC_TCP_SSL_ENABLED +AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) { + _secure = secure; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) { + std::array newFingerprint; + memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE); + _secureServerFingerprints.push_back(newFingerprint); + return *this; +} +#endif + +AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) { + _onConnectUserCallbacks.push_back(callback); + return *this; +} + +AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) { + _onDisconnectUserCallbacks.push_back(callback); + return *this; +} + +AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) { + _onSubscribeUserCallbacks.push_back(callback); + return *this; +} + +AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) { + _onUnsubscribeUserCallbacks.push_back(callback); + return *this; +} + +AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) { + _onMessageUserCallbacks.push_back(callback); + return *this; +} + +AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) { + _onPublishUserCallbacks.push_back(callback); + return *this; +} + +void AsyncMqttClient::_freeCurrentParsedPacket() { + delete _currentParsedPacket; + _currentParsedPacket = nullptr; +} + +void AsyncMqttClient::_clear() { + _lastPingRequestTime = 0; + _connected = false; + _disconnectFlagged = false; + _connectPacketNotEnoughSpace = false; + _tlsBadFingerprint = false; + _freeCurrentParsedPacket(); + + _pendingPubRels.clear(); + _pendingPubRels.shrink_to_fit(); + + _toSendAcks.clear(); + _toSendAcks.shrink_to_fit(); + + _nextPacketId = 1; + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; +} + +/* TCP */ +void AsyncMqttClient::_onConnect(AsyncClient* client) { + (void)client; + +#if ASYNC_TCP_SSL_ENABLED + if (_secure && _secureServerFingerprints.size() > 0) { + SSL* clientSsl = _client.getSSL(); + + bool sslFoundFingerprint = false; + for (std::array fingerprint : _secureServerFingerprints) { + if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) { + sslFoundFingerprint = true; + break; + } + } + + if (!sslFoundFingerprint) { + _tlsBadFingerprint = true; + _client.close(true); + return; + } + } +#endif + + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.CONNECT; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.CONNECT_RESERVED; + + uint16_t protocolNameLength = 4; + char protocolNameLengthBytes[2]; + protocolNameLengthBytes[0] = protocolNameLength >> 8; + protocolNameLengthBytes[1] = protocolNameLength & 0xFF; + + char protocolLevel[1]; + protocolLevel[0] = 0x04; + + char connectFlags[1]; + connectFlags[0] = 0; + if (_cleanSession) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.CLEAN_SESSION; + if (_username != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.USERNAME; + if (_password != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.PASSWORD; + if (_willTopic != nullptr) { + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL; + if (_willRetain) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_RETAIN; + switch (_willQos) { + case 0: + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS0; + break; + case 1: + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS1; + break; + case 2: + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS2; + break; + } + } + + char keepAliveBytes[2]; + keepAliveBytes[0] = _keepAlive >> 8; + keepAliveBytes[1] = _keepAlive & 0xFF; + + uint16_t clientIdLength = strlen(_clientId); + char clientIdLengthBytes[2]; + clientIdLengthBytes[0] = clientIdLength >> 8; + clientIdLengthBytes[1] = clientIdLength & 0xFF; + + // Optional fields + uint16_t willTopicLength = 0; + char willTopicLengthBytes[2]; + uint16_t willPayloadLength = _willPayloadLength; + char willPayloadLengthBytes[2]; + if (_willTopic != nullptr) { + willTopicLength = strlen(_willTopic); + willTopicLengthBytes[0] = willTopicLength >> 8; + willTopicLengthBytes[1] = willTopicLength & 0xFF; + + if (_willPayload != nullptr && willPayloadLength == 0) willPayloadLength = strlen(_willPayload); + + willPayloadLengthBytes[0] = willPayloadLength >> 8; + willPayloadLengthBytes[1] = willPayloadLength & 0xFF; + } + + uint16_t usernameLength = 0; + char usernameLengthBytes[2]; + if (_username != nullptr) { + usernameLength = strlen(_username); + usernameLengthBytes[0] = usernameLength >> 8; + usernameLengthBytes[1] = usernameLength & 0xFF; + } + + uint16_t passwordLength = 0; + char passwordLengthBytes[2]; + if (_password != nullptr) { + passwordLength = strlen(_password); + passwordLengthBytes[0] = passwordLength >> 8; + passwordLengthBytes[1] = passwordLength & 0xFF; + } + + uint32_t remainingLength = 2 + protocolNameLength + 1 + 1 + 2 + 2 + clientIdLength; // always present + if (_willTopic != nullptr) remainingLength += 2 + willTopicLength + 2 + willPayloadLength; + if (_username != nullptr) remainingLength += 2 + usernameLength; + if (_password != nullptr) remainingLength += 2 + passwordLength; + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1); + + uint32_t neededSpace = 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += protocolNameLength; + neededSpace += 1; + neededSpace += 1; + neededSpace += 2; + neededSpace += 2; + neededSpace += clientIdLength; + if (_willTopic != nullptr) { + neededSpace += 2; + neededSpace += willTopicLength; + + neededSpace += 2; + if (_willPayload != nullptr) neededSpace += willPayloadLength; + } + if (_username != nullptr) { + neededSpace += 2; + neededSpace += usernameLength; + } + if (_password != nullptr) { + neededSpace += 2; + neededSpace += passwordLength; + } + + if (_client.space() < neededSpace) { + _connectPacketNotEnoughSpace = true; + _client.close(true); + return; + } + + _client.add(fixedHeader, 1 + remainingLengthLength); + _client.add(protocolNameLengthBytes, 2); + _client.add("MQTT", protocolNameLength); + _client.add(protocolLevel, 1); + _client.add(connectFlags, 1); + _client.add(keepAliveBytes, 2); + _client.add(clientIdLengthBytes, 2); + _client.add(_clientId, clientIdLength); + if (_willTopic != nullptr) { + _client.add(willTopicLengthBytes, 2); + _client.add(_willTopic, willTopicLength); + + _client.add(willPayloadLengthBytes, 2); + if (_willPayload != nullptr) _client.add(_willPayload, willPayloadLength); + } + if (_username != nullptr) { + _client.add(usernameLengthBytes, 2); + _client.add(_username, usernameLength); + } + if (_password != nullptr) { + _client.add(passwordLengthBytes, 2); + _client.add(_password, passwordLength); + } + _client.send(); + _lastClientActivity = millis(); +} + +void AsyncMqttClient::_onDisconnect(AsyncClient* client) { + (void)client; + AsyncMqttClientDisconnectReason reason; + + if (_connectPacketNotEnoughSpace) { + reason = AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE; + } else if (_tlsBadFingerprint) { + reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; + } else { + reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; + } + + _clear(); + + for (auto callback : _onDisconnectUserCallbacks) callback(reason); + + _connectPacketNotEnoughSpace = false; + _tlsBadFingerprint = false; +} + +void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) { + (void)client; + (void)error; + // _onDisconnect called anyway +} + +void AsyncMqttClient::_onTimeout(AsyncClient* client, uint32_t time) { + (void)client; + (void)time; + // disconnection will be handled by ping/pong management +} + +void AsyncMqttClient::_onAck(AsyncClient* client, size_t len, uint32_t time) { + (void)client; + (void)len; + (void)time; +} + +void AsyncMqttClient::_onData(AsyncClient* client, char* data, size_t len) { + (void)client; + size_t currentBytePosition = 0; + char currentByte; + do { + switch (_parsingInformation.bufferState) { + case AsyncMqttClientInternals::BufferState::NONE: + currentByte = data[currentBytePosition++]; + _parsingInformation.packetType = currentByte >> 4; + _parsingInformation.packetFlags = (currentByte << 4) >> 4; + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH; + _lastServerActivity = millis(); + switch (_parsingInformation.packetType) { + case AsyncMqttClientInternals::PacketType.CONNACK: + _currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.PINGRESP: + _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this)); + break; + case AsyncMqttClientInternals::PacketType.SUBACK: + _currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.UNSUBACK: + _currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBLISH: + _currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.PUBREL: + _currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBACK: + _currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBREC: + _currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBCOMP: + _currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1)); + break; + default: + break; + } + break; + case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH: + currentByte = data[currentBytePosition++]; + _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte; + if (currentByte >> 7 == 0) { + _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer); + _remainingLengthBufferPosition = 0; + if (_parsingInformation.remainingLength > 0) { + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER; + } else { + // PINGRESP is a special case where it has no variable header, so the packet ends right here + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; + _onPingResp(); + } + } + break; + case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER: + _currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition); + break; + case AsyncMqttClientInternals::BufferState::PAYLOAD: + _currentParsedPacket->parsePayload(data, len, ¤tBytePosition); + break; + default: + currentBytePosition = len; + } + } while (currentBytePosition != len); +} + +void AsyncMqttClient::_onPoll(AsyncClient* client) { + if (!_connected) return; + + // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections + if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) { + disconnect(); + return; + // send ping to ensure the server will receive at least one message inside keepalive window + } else if (_lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) { + _sendPing(); + + // send ping to verify if the server is still there (ensure this is not a half connection) + } else if (_connected && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) { + _sendPing(); + } + + // handle to send ack packets + + _sendAcks(); + + // handle disconnect + + if (_disconnectFlagged) { + _sendDisconnect(); + } +} + +/* MQTT */ +void AsyncMqttClient::_onPingResp() { + _freeCurrentParsedPacket(); + _lastPingRequestTime = 0; +} + +void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) { + (void)sessionPresent; + _freeCurrentParsedPacket(); + + if (connectReturnCode == 0) { + _connected = true; + for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); + } else { + _clear(); + for (auto callback : _onDisconnectUserCallbacks) callback(static_cast(connectReturnCode)); + } +} + +void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) { + _freeCurrentParsedPacket(); + + for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status); +} + +void AsyncMqttClient::_onUnsubAck(uint16_t packetId) { + _freeCurrentParsedPacket(); + + for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId); +} + +void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) { + bool notifyPublish = true; + + if (qos == 2) { + for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { + if (pendingPubRel.packetId == packetId) { + notifyPublish = false; + break; + } + } + } + + if (notifyPublish) { + AsyncMqttClientMessageProperties properties; + properties.qos = qos; + properties.dup = dup; + properties.retain = retain; + + for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total); + } +} + +void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) { + AsyncMqttClientInternals::PendingAck pendingAck; + + if (qos == 1) { + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED; + pendingAck.packetId = packetId; + _toSendAcks.push_back(pendingAck); + } else if (qos == 2) { + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED; + pendingAck.packetId = packetId; + _toSendAcks.push_back(pendingAck); + + bool pubRelAwaiting = false; + for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { + if (pendingPubRel.packetId == packetId) { + pubRelAwaiting = true; + break; + } + } + + if (!pubRelAwaiting) { + AsyncMqttClientInternals::PendingPubRel pendingPubRel; + pendingPubRel.packetId = packetId; + _pendingPubRels.push_back(pendingPubRel); + } + + _sendAcks(); + } + + _freeCurrentParsedPacket(); +} + +void AsyncMqttClient::_onPubRel(uint16_t packetId) { + _freeCurrentParsedPacket(); + + AsyncMqttClientInternals::PendingAck pendingAck; + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED; + pendingAck.packetId = packetId; + _toSendAcks.push_back(pendingAck); + + for (size_t i = 0; i < _pendingPubRels.size(); i++) { + if (_pendingPubRels[i].packetId == packetId) { + _pendingPubRels.erase(_pendingPubRels.begin() + i); + _pendingPubRels.shrink_to_fit(); + } + } + + _sendAcks(); +} + +void AsyncMqttClient::_onPubAck(uint16_t packetId) { + _freeCurrentParsedPacket(); + + for (auto callback : _onPublishUserCallbacks) callback(packetId); +} + +void AsyncMqttClient::_onPubRec(uint16_t packetId) { + _freeCurrentParsedPacket(); + + AsyncMqttClientInternals::PendingAck pendingAck; + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED; + pendingAck.packetId = packetId; + _toSendAcks.push_back(pendingAck); + + _sendAcks(); +} + +void AsyncMqttClient::_onPubComp(uint16_t packetId) { + _freeCurrentParsedPacket(); + + for (auto callback : _onPublishUserCallbacks) callback(packetId); +} + +bool AsyncMqttClient::_sendPing() { + char fixedHeader[2]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.PINGREQ; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PINGREQ_RESERVED; + fixedHeader[1] = 0; + + size_t neededSpace = 2; + + if (_client.space() < neededSpace) return false; + + _client.add(fixedHeader, 2); + _client.send(); + _lastClientActivity = millis(); + _lastPingRequestTime = millis(); + + return true; +} + +void AsyncMqttClient::_sendAcks() { + uint8_t neededAckSpace = 2 + 2; + + for (size_t i = 0; i < _toSendAcks.size(); i++) { + if (_client.space() < neededAckSpace) break; + + AsyncMqttClientInternals::PendingAck pendingAck = _toSendAcks[i]; + + char fixedHeader[2]; + fixedHeader[0] = pendingAck.packetType; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | pendingAck.headerFlag; + fixedHeader[1] = 2; + + char packetIdBytes[2]; + packetIdBytes[0] = pendingAck.packetId >> 8; + packetIdBytes[1] = pendingAck.packetId & 0xFF; + + _client.add(fixedHeader, 2); + _client.add(packetIdBytes, 2); + _client.send(); + + _toSendAcks.erase(_toSendAcks.begin() + i); + _toSendAcks.shrink_to_fit(); + + _lastClientActivity = millis(); + } +} + +bool AsyncMqttClient::_sendDisconnect() { + const uint8_t neededSpace = 2; + + if (_client.space() < neededSpace) return false; + + char fixedHeader[2]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED; + fixedHeader[1] = 0; + + _client.add(fixedHeader, 2); + _client.send(); + _client.close(true); + + _disconnectFlagged = false; + + return true; +} + +uint16_t AsyncMqttClient::_getNextPacketId() { + uint16_t nextPacketId = _nextPacketId; + + if (_nextPacketId == 65535) _nextPacketId = 0; // 0 is forbidden + _nextPacketId++; + + return nextPacketId; +} + +bool AsyncMqttClient::connected() const { + return _connected; +} + +void AsyncMqttClient::connect() { + if (_connected) return; + +#if ASYNC_TCP_SSL_ENABLED + if (_useIp) { + _client.connect(_ip, _port, _secure); + } else { + _client.connect(_host, _port, _secure); + } +#else + if (_useIp) { + _client.connect(_ip, _port); + } else { + _client.connect(_host, _port); + } +#endif +} + +void AsyncMqttClient::disconnect(bool force) { + if (!_connected) return; + + if (force) { + _client.close(true); + } else { + _disconnectFlagged = true; + _sendDisconnect(); + _client.send(); + } +} + +uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { + if (!_connected) return 0; + + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.SUBSCRIBE; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.SUBSCRIBE_RESERVED; + + uint16_t topicLength = strlen(topic); + char topicLengthBytes[2]; + topicLengthBytes[0] = topicLength >> 8; + topicLengthBytes[1] = topicLength & 0xFF; + + char qosByte[1]; + qosByte[0] = qos; + + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength + 1, fixedHeader + 1); + + size_t neededSpace = 0; + neededSpace += 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += 2; + neededSpace += topicLength; + neededSpace += 1; + if (_client.space() < neededSpace) return 0; + + uint16_t packetId = _getNextPacketId(); + char packetIdBytes[2]; + packetIdBytes[0] = packetId >> 8; + packetIdBytes[1] = packetId & 0xFF; + + _client.add(fixedHeader, 1 + remainingLengthLength); + _client.add(packetIdBytes, 2); + _client.add(topicLengthBytes, 2); + _client.add(topic, topicLength); + _client.add(qosByte, 1); + _client.send(); + _lastClientActivity = millis(); + + return packetId; +} + +uint16_t AsyncMqttClient::unsubscribe(const char* topic) { + if (!_connected) return 0; + + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.UNSUBSCRIBE; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.UNSUBSCRIBE_RESERVED; + + uint16_t topicLength = strlen(topic); + char topicLengthBytes[2]; + topicLengthBytes[0] = topicLength >> 8; + topicLengthBytes[1] = topicLength & 0xFF; + + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength, fixedHeader + 1); + + size_t neededSpace = 0; + neededSpace += 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += 2; + neededSpace += topicLength; + if (_client.space() < neededSpace) return 0; + + uint16_t packetId = _getNextPacketId(); + char packetIdBytes[2]; + packetIdBytes[0] = packetId >> 8; + packetIdBytes[1] = packetId & 0xFF; + + _client.add(fixedHeader, 1 + remainingLengthLength); + _client.add(packetIdBytes, 2); + _client.add(topicLengthBytes, 2); + _client.add(topic, topicLength); + _client.send(); + _lastClientActivity = millis(); + + return packetId; +} + +uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) { + if (!_connected) return 0; + + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBLISH; + fixedHeader[0] = fixedHeader[0] << 4; + if (dup) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP; + if (retain) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_RETAIN; + switch (qos) { + case 0: + fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS0; + break; + case 1: + fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS1; + break; + case 2: + fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS2; + break; + } + + uint16_t topicLength = strlen(topic); + char topicLengthBytes[2]; + topicLengthBytes[0] = topicLength >> 8; + topicLengthBytes[1] = topicLength & 0xFF; + + uint32_t payloadLength = length; + if (payload != nullptr && payloadLength == 0) payloadLength = strlen(payload); + + uint32_t remainingLength = 2 + topicLength + payloadLength; + if (qos != 0) remainingLength += 2; + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1); + + size_t neededSpace = 0; + neededSpace += 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += topicLength; + if (qos != 0) neededSpace += 2; + if (payload != nullptr) neededSpace += payloadLength; + if (_client.space() < neededSpace) return 0; + + uint16_t packetId = 0; + char packetIdBytes[2]; + if (qos != 0) { + if (dup && message_id > 0) { + packetId = message_id; + } else { + packetId = _getNextPacketId(); + } + + packetIdBytes[0] = packetId >> 8; + packetIdBytes[1] = packetId & 0xFF; + } + + _client.add(fixedHeader, 1 + remainingLengthLength); + _client.add(topicLengthBytes, 2); + _client.add(topic, topicLength); + if (qos != 0) _client.add(packetIdBytes, 2); + if (payload != nullptr) _client.add(payload, payloadLength); + _client.send(); + _lastClientActivity = millis(); + + if (qos != 0) { + return packetId; + } else { + return 1; + } +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.h b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.h new file mode 100644 index 00000000..23d30554 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.h @@ -0,0 +1,6 @@ +#ifndef SRC_ASYNCMQTTCLIENT_H_ +#define SRC_ASYNCMQTTCLIENT_H_ + +#include "AsyncMqttClient.hpp" + +#endif // SRC_ASYNCMQTTCLIENT_H_ diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp new file mode 100644 index 00000000..f4191f90 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp @@ -0,0 +1,153 @@ +#pragma once + +#include +#include + +#include "Arduino.h" + +#ifdef ESP32 +#include +#elif defined(ESP8266) +#include +#else +#error Platform not supported +#endif + +#if ASYNC_TCP_SSL_ENABLED +#include +#define SHA1_SIZE 20 +#endif + +#include "AsyncMqttClient/Flags.hpp" +#include "AsyncMqttClient/ParsingInformation.hpp" +#include "AsyncMqttClient/MessageProperties.hpp" +#include "AsyncMqttClient/Helpers.hpp" +#include "AsyncMqttClient/Callbacks.hpp" +#include "AsyncMqttClient/DisconnectReasons.hpp" +#include "AsyncMqttClient/Storage.hpp" + +#include "AsyncMqttClient/Packets/Packet.hpp" +#include "AsyncMqttClient/Packets/ConnAckPacket.hpp" +#include "AsyncMqttClient/Packets/PingRespPacket.hpp" +#include "AsyncMqttClient/Packets/SubAckPacket.hpp" +#include "AsyncMqttClient/Packets/UnsubAckPacket.hpp" +#include "AsyncMqttClient/Packets/PublishPacket.hpp" +#include "AsyncMqttClient/Packets/PubRelPacket.hpp" +#include "AsyncMqttClient/Packets/PubAckPacket.hpp" +#include "AsyncMqttClient/Packets/PubRecPacket.hpp" +#include "AsyncMqttClient/Packets/PubCompPacket.hpp" + +class AsyncMqttClient { + public: + AsyncMqttClient(); + ~AsyncMqttClient(); + + AsyncMqttClient& setKeepAlive(uint16_t keepAlive); + AsyncMqttClient& setClientId(const char* clientId); + AsyncMqttClient& setCleanSession(bool cleanSession); + AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength); + AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr); + AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0); + AsyncMqttClient& setServer(IPAddress ip, uint16_t port); + AsyncMqttClient& setServer(const char* host, uint16_t port); +#if ASYNC_TCP_SSL_ENABLED + AsyncMqttClient& setSecure(bool secure); + AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint); +#endif + + AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback); + AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); + AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); + AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); + AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback); + AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); + + bool connected() const; + void connect(); + void disconnect(bool force = false); + uint16_t subscribe(const char* topic, uint8_t qos); + uint16_t unsubscribe(const char* topic); + uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); + + private: + AsyncClient _client; + + bool _connected; + bool _connectPacketNotEnoughSpace; + bool _disconnectFlagged; + bool _tlsBadFingerprint; + uint32_t _lastClientActivity; + uint32_t _lastServerActivity; + uint32_t _lastPingRequestTime; + + char _generatedClientId[13 + 1]; // esp8266abc123 + IPAddress _ip; + const char* _host; + bool _useIp; +#if ASYNC_TCP_SSL_ENABLED + bool _secure; +#endif + uint16_t _port; + uint16_t _keepAlive; + bool _cleanSession; + const char* _clientId; + const char* _username; + const char* _password; + const char* _willTopic; + const char* _willPayload; + uint16_t _willPayloadLength; + uint8_t _willQos; + bool _willRetain; + +#if ASYNC_TCP_SSL_ENABLED + std::vector> _secureServerFingerprints; +#endif + + std::vector _onConnectUserCallbacks; + std::vector _onDisconnectUserCallbacks; + std::vector _onSubscribeUserCallbacks; + std::vector _onUnsubscribeUserCallbacks; + std::vector _onMessageUserCallbacks; + std::vector _onPublishUserCallbacks; + + AsyncMqttClientInternals::ParsingInformation _parsingInformation; + AsyncMqttClientInternals::Packet* _currentParsedPacket; + uint8_t _remainingLengthBufferPosition; + char _remainingLengthBuffer[4]; + + uint16_t _nextPacketId; + + std::vector _pendingPubRels; + + std::vector _toSendAcks; + + void _clear(); + void _freeCurrentParsedPacket(); + + // TCP + void _onConnect(AsyncClient* client); + void _onDisconnect(AsyncClient* client); + static void _onError(AsyncClient* client, int8_t error); + void _onTimeout(AsyncClient* client, uint32_t time); + static void _onAck(AsyncClient* client, size_t len, uint32_t time); + void _onData(AsyncClient* client, char* data, size_t len); + void _onPoll(AsyncClient* client); + + // MQTT + void _onPingResp(); + void _onConnAck(bool sessionPresent, uint8_t connectReturnCode); + void _onSubAck(uint16_t packetId, char status); + void _onUnsubAck(uint16_t packetId); + void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId); + void _onPublish(uint16_t packetId, uint8_t qos); + void _onPubRel(uint16_t packetId); + void _onPubAck(uint16_t packetId); + void _onPubRec(uint16_t packetId); + void _onPubComp(uint16_t packetId); + + bool _sendPing(); + void _sendAcks(); + bool _sendDisconnect(); + + uint16_t _getNextPacketId(); +}; diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Callbacks.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Callbacks.hpp new file mode 100644 index 00000000..7c3d63dd --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Callbacks.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include "DisconnectReasons.hpp" +#include "MessageProperties.hpp" + +namespace AsyncMqttClientInternals { +// user callbacks +typedef std::function OnConnectUserCallback; +typedef std::function OnDisconnectUserCallback; +typedef std::function OnSubscribeUserCallback; +typedef std::function OnUnsubscribeUserCallback; +typedef std::function OnMessageUserCallback; +typedef std::function OnPublishUserCallback; + +// internal callbacks +typedef std::function OnConnAckInternalCallback; +typedef std::function OnPingRespInternalCallback; +typedef std::function OnSubAckInternalCallback; +typedef std::function OnUnsubAckInternalCallback; +typedef std::function OnMessageInternalCallback; +typedef std::function OnPublishInternalCallback; +typedef std::function OnPubRelInternalCallback; +typedef std::function OnPubAckInternalCallback; +typedef std::function OnPubRecInternalCallback; +typedef std::function OnPubCompInternalCallback; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/DisconnectReasons.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/DisconnectReasons.hpp new file mode 100644 index 00000000..f4cbda8a --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/DisconnectReasons.hpp @@ -0,0 +1,15 @@ +#pragma once + +enum class AsyncMqttClientDisconnectReason : int8_t { + TCP_DISCONNECTED = 0, + + MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 1, + MQTT_IDENTIFIER_REJECTED = 2, + MQTT_SERVER_UNAVAILABLE = 3, + MQTT_MALFORMED_CREDENTIALS = 4, + MQTT_NOT_AUTHORIZED = 5, + + ESP8266_NOT_ENOUGH_SPACE = 6, + + TLS_BAD_FINGERPRINT = 7 +}; diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Flags.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Flags.hpp new file mode 100644 index 00000000..a1fb3e3c --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Flags.hpp @@ -0,0 +1,57 @@ +#pragma once + +namespace AsyncMqttClientInternals { +constexpr struct { + const uint8_t RESERVED = 0; + const uint8_t CONNECT = 1; + const uint8_t CONNACK = 2; + const uint8_t PUBLISH = 3; + const uint8_t PUBACK = 4; + const uint8_t PUBREC = 5; + const uint8_t PUBREL = 6; + const uint8_t PUBCOMP = 7; + const uint8_t SUBSCRIBE = 8; + const uint8_t SUBACK = 9; + const uint8_t UNSUBSCRIBE = 10; + const uint8_t UNSUBACK = 11; + const uint8_t PINGREQ = 12; + const uint8_t PINGRESP = 13; + const uint8_t DISCONNECT = 14; + const uint8_t RESERVED2 = 1; +} PacketType; + +constexpr struct { + const uint8_t CONNECT_RESERVED = 0x00; + const uint8_t CONNACK_RESERVED = 0x00; + const uint8_t PUBLISH_DUP = 0x08; + const uint8_t PUBLISH_QOS0 = 0x00; + const uint8_t PUBLISH_QOS1 = 0x02; + const uint8_t PUBLISH_QOS2 = 0x04; + const uint8_t PUBLISH_QOSRESERVED = 0x06; + const uint8_t PUBLISH_RETAIN = 0x01; + const uint8_t PUBACK_RESERVED = 0x00; + const uint8_t PUBREC_RESERVED = 0x00; + const uint8_t PUBREL_RESERVED = 0x02; + const uint8_t PUBCOMP_RESERVED = 0x00; + const uint8_t SUBSCRIBE_RESERVED = 0x02; + const uint8_t SUBACK_RESERVED = 0x00; + const uint8_t UNSUBSCRIBE_RESERVED = 0x02; + const uint8_t UNSUBACK_RESERVED = 0x00; + const uint8_t PINGREQ_RESERVED = 0x00; + const uint8_t PINGRESP_RESERVED = 0x00; + const uint8_t DISCONNECT_RESERVED = 0x00; + const uint8_t RESERVED2_RESERVED = 0x00; +} HeaderFlag; + +constexpr struct { + const uint8_t USERNAME = 0x80; + const uint8_t PASSWORD = 0x40; + const uint8_t WILL_RETAIN = 0x20; + const uint8_t WILL_QOS0 = 0x00; + const uint8_t WILL_QOS1 = 0x08; + const uint8_t WILL_QOS2 = 0x10; + const uint8_t WILL = 0x04; + const uint8_t CLEAN_SESSION = 0x02; + const uint8_t RESERVED = 0x00; +} ConnectFlag; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Helpers.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Helpers.hpp new file mode 100644 index 00000000..5737c02e --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Helpers.hpp @@ -0,0 +1,38 @@ +#pragma once + +namespace AsyncMqttClientInternals { +class Helpers { + public: + static uint32_t decodeRemainingLength(char* bytes) { + uint32_t multiplier = 1; + uint32_t value = 0; + uint8_t currentByte = 0; + uint8_t encodedByte; + do { + encodedByte = bytes[currentByte++]; + value += (encodedByte & 127) * multiplier; + multiplier *= 128; + } while ((encodedByte & 128) != 0); + + return value; + } + + static uint8_t encodeRemainingLength(uint32_t remainingLength, char* destination) { + uint8_t currentByte = 0; + uint8_t bytesNeeded = 0; + + do { + uint8_t encodedByte = remainingLength % 128; + remainingLength /= 128; + if (remainingLength > 0) { + encodedByte = encodedByte | 128; + } + + destination[currentByte++] = encodedByte; + bytesNeeded++; + } while (remainingLength > 0); + + return bytesNeeded; + } +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/MessageProperties.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/MessageProperties.hpp new file mode 100644 index 00000000..c04b5966 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/MessageProperties.hpp @@ -0,0 +1,7 @@ +#pragma once + +struct AsyncMqttClientMessageProperties { + uint8_t qos; + bool dup; + bool retain; +}; diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.cpp new file mode 100644 index 00000000..f9091c21 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.cpp @@ -0,0 +1,30 @@ +#include "ConnAckPacket.hpp" + +using AsyncMqttClientInternals::ConnAckPacket; + +ConnAckPacket::ConnAckPacket(ParsingInformation* parsingInformation, OnConnAckInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _sessionPresent(false) +, _connectReturnCode(0) { +} + +ConnAckPacket::~ConnAckPacket() { +} + +void ConnAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _sessionPresent = (currentByte << 7) >> 7; + } else { + _connectReturnCode = currentByte; + _parsingInformation->bufferState = BufferState::NONE; + _callback(_sessionPresent, _connectReturnCode); + } +} + +void ConnAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.hpp new file mode 100644 index 00000000..8be9ab19 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/ConnAckPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class ConnAckPacket : public Packet { + public: + explicit ConnAckPacket(ParsingInformation* parsingInformation, OnConnAckInternalCallback callback); + ~ConnAckPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnConnAckInternalCallback _callback; + + uint8_t _bytePosition; + bool _sessionPresent; + uint8_t _connectReturnCode; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/Packet.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/Packet.hpp new file mode 100644 index 00000000..9552cf06 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/Packet.hpp @@ -0,0 +1,11 @@ +#pragma once + +namespace AsyncMqttClientInternals { +class Packet { + public: + virtual ~Packet() {} + + virtual void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) = 0; + virtual void parsePayload(char* data, size_t len, size_t* currentBytePosition) = 0; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.cpp new file mode 100644 index 00000000..2a939aac --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.cpp @@ -0,0 +1,21 @@ +#include "PingRespPacket.hpp" + +using AsyncMqttClientInternals::PingRespPacket; + +PingRespPacket::PingRespPacket(ParsingInformation* parsingInformation, OnPingRespInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) { +} + +PingRespPacket::~PingRespPacket() { +} + +void PingRespPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} + +void PingRespPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.hpp new file mode 100644 index 00000000..043a7303 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PingRespPacket.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class PingRespPacket : public Packet { + public: + explicit PingRespPacket(ParsingInformation* parsingInformation, OnPingRespInternalCallback callback); + ~PingRespPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnPingRespInternalCallback _callback; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.cpp new file mode 100644 index 00000000..efa5fa41 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.cpp @@ -0,0 +1,30 @@ +#include "PubAckPacket.hpp" + +using AsyncMqttClientInternals::PubAckPacket; + +PubAckPacket::PubAckPacket(ParsingInformation* parsingInformation, OnPubAckInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _packetIdMsb(0) +, _packetId(0) { +} + +PubAckPacket::~PubAckPacket() { +} + +void PubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _parsingInformation->bufferState = BufferState::NONE; + _callback(_packetId); + } +} + +void PubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.hpp new file mode 100644 index 00000000..bd00142d --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubAckPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class PubAckPacket : public Packet { + public: + explicit PubAckPacket(ParsingInformation* parsingInformation, OnPubAckInternalCallback callback); + ~PubAckPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnPubAckInternalCallback _callback; + + uint8_t _bytePosition; + char _packetIdMsb; + uint16_t _packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.cpp new file mode 100644 index 00000000..2b3e00de --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.cpp @@ -0,0 +1,30 @@ +#include "PubCompPacket.hpp" + +using AsyncMqttClientInternals::PubCompPacket; + +PubCompPacket::PubCompPacket(ParsingInformation* parsingInformation, OnPubCompInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _packetIdMsb(0) +, _packetId(0) { +} + +PubCompPacket::~PubCompPacket() { +} + +void PubCompPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _parsingInformation->bufferState = BufferState::NONE; + _callback(_packetId); + } +} + +void PubCompPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.hpp new file mode 100644 index 00000000..17c1db48 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubCompPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class PubCompPacket : public Packet { + public: + explicit PubCompPacket(ParsingInformation* parsingInformation, OnPubCompInternalCallback callback); + ~PubCompPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnPubCompInternalCallback _callback; + + uint8_t _bytePosition; + char _packetIdMsb; + uint16_t _packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.cpp new file mode 100644 index 00000000..ec535c61 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.cpp @@ -0,0 +1,30 @@ +#include "PubRecPacket.hpp" + +using AsyncMqttClientInternals::PubRecPacket; + +PubRecPacket::PubRecPacket(ParsingInformation* parsingInformation, OnPubRecInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _packetIdMsb(0) +, _packetId(0) { +} + +PubRecPacket::~PubRecPacket() { +} + +void PubRecPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _parsingInformation->bufferState = BufferState::NONE; + _callback(_packetId); + } +} + +void PubRecPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.hpp new file mode 100644 index 00000000..910130a2 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRecPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class PubRecPacket : public Packet { + public: + explicit PubRecPacket(ParsingInformation* parsingInformation, OnPubRecInternalCallback callback); + ~PubRecPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnPubRecInternalCallback _callback; + + uint8_t _bytePosition; + char _packetIdMsb; + uint16_t _packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.cpp new file mode 100644 index 00000000..2d5abde0 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.cpp @@ -0,0 +1,30 @@ +#include "PubRelPacket.hpp" + +using AsyncMqttClientInternals::PubRelPacket; + +PubRelPacket::PubRelPacket(ParsingInformation* parsingInformation, OnPubRelInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _packetIdMsb(0) +, _packetId(0) { +} + +PubRelPacket::~PubRelPacket() { +} + +void PubRelPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _parsingInformation->bufferState = BufferState::NONE; + _callback(_packetId); + } +} + +void PubRelPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.hpp new file mode 100644 index 00000000..edea3d53 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PubRelPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class PubRelPacket : public Packet { + public: + explicit PubRelPacket(ParsingInformation* parsingInformation, OnPubRelInternalCallback callback); + ~PubRelPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnPubRelInternalCallback _callback; + + uint8_t _bytePosition; + char _packetIdMsb; + uint16_t _packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.cpp new file mode 100644 index 00000000..dab30a15 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.cpp @@ -0,0 +1,91 @@ +#include "PublishPacket.hpp" + +using AsyncMqttClientInternals::PublishPacket; + +PublishPacket::PublishPacket(ParsingInformation* parsingInformation, OnMessageInternalCallback dataCallback, OnPublishInternalCallback completeCallback) +: _parsingInformation(parsingInformation) +, _dataCallback(dataCallback) +, _completeCallback(completeCallback) +, _dup(false) +, _qos(0) +, _retain(0) +, _bytePosition(0) +, _topicLengthMsb(0) +, _topicLength(0) +, _ignore(false) +, _packetIdMsb(0) +, _packetId(0) +, _payloadLength(0) +, _payloadBytesRead(0) { + _dup = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_DUP; + _retain = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_RETAIN; + char qosMasked = _parsingInformation->packetFlags & 0x06; + switch (qosMasked) { + case HeaderFlag.PUBLISH_QOS0: + _qos = 0; + break; + case HeaderFlag.PUBLISH_QOS1: + _qos = 1; + break; + case HeaderFlag.PUBLISH_QOS2: + _qos = 2; + break; + } +} + +PublishPacket::~PublishPacket() { +} + +void PublishPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition == 0) { + _topicLengthMsb = currentByte; + } else if (_bytePosition == 1) { + _topicLength = currentByte | _topicLengthMsb << 8; + if (_topicLength > _parsingInformation->maxTopicLength) { + _ignore = true; + } else { + _parsingInformation->topicBuffer[_topicLength] = '\0'; + } + } else if (_bytePosition >= 2 && _bytePosition < 2 + _topicLength) { + // Starting from here, _ignore might be true + if (!_ignore) _parsingInformation->topicBuffer[_bytePosition - 2] = currentByte; + if (_bytePosition == 2 + _topicLength - 1 && _qos == 0) { + _preparePayloadHandling(_parsingInformation->remainingLength - (_bytePosition + 1)); + return; + } + } else if (_bytePosition == 2 + _topicLength) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _preparePayloadHandling(_parsingInformation->remainingLength - (_bytePosition + 1)); + } + _bytePosition++; +} + +void PublishPacket::_preparePayloadHandling(uint32_t payloadLength) { + _payloadLength = payloadLength; + if (payloadLength == 0) { + _parsingInformation->bufferState = BufferState::NONE; + if (!_ignore) { + _dataCallback(_parsingInformation->topicBuffer, nullptr, _qos, _dup, _retain, 0, 0, 0, _packetId); + _completeCallback(_packetId, _qos); + } + } else { + _parsingInformation->bufferState = BufferState::PAYLOAD; + } +} + +void PublishPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + size_t remainToRead = len - (*currentBytePosition); + if (_payloadBytesRead + remainToRead > _payloadLength) remainToRead = _payloadLength - _payloadBytesRead; + + if (!_ignore) _dataCallback(_parsingInformation->topicBuffer, data + (*currentBytePosition), _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId); + _payloadBytesRead += remainToRead; + (*currentBytePosition) += remainToRead; + + if (_payloadBytesRead == _payloadLength) { + _parsingInformation->bufferState = BufferState::NONE; + if (!_ignore) _completeCallback(_packetId, _qos); + } +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.hpp new file mode 100644 index 00000000..9dc19f0a --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/PublishPacket.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../Flags.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class PublishPacket : public Packet { + public: + explicit PublishPacket(ParsingInformation* parsingInformation, OnMessageInternalCallback dataCallback, OnPublishInternalCallback completeCallback); + ~PublishPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnMessageInternalCallback _dataCallback; + OnPublishInternalCallback _completeCallback; + + void _preparePayloadHandling(uint32_t payloadLength); + + bool _dup; + uint8_t _qos; + bool _retain; + + uint8_t _bytePosition; + char _topicLengthMsb; + uint16_t _topicLength; + bool _ignore; + char _packetIdMsb; + uint16_t _packetId; + uint32_t _payloadLength; + uint32_t _payloadBytesRead; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.cpp new file mode 100644 index 00000000..ab899655 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.cpp @@ -0,0 +1,46 @@ +#include "SubAckPacket.hpp" + +using AsyncMqttClientInternals::SubAckPacket; + +SubAckPacket::SubAckPacket(ParsingInformation* parsingInformation, OnSubAckInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _packetIdMsb(0) +, _packetId(0) { +} + +SubAckPacket::~SubAckPacket() { +} + +void SubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _parsingInformation->bufferState = BufferState::PAYLOAD; + } +} + +void SubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + char status = data[(*currentBytePosition)++]; + + /* switch (status) { + case 0: + Serial.println("Success QoS 0"); + break; + case 1: + Serial.println("Success QoS 1"); + break; + case 2: + Serial.println("Success QoS 2"); + break; + case 0x80: + Serial.println("Failure"); + break; + } */ + + _parsingInformation->bufferState = BufferState::NONE; + _callback(_packetId, status); +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.hpp new file mode 100644 index 00000000..011b800a --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/SubAckPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class SubAckPacket : public Packet { + public: + explicit SubAckPacket(ParsingInformation* parsingInformation, OnSubAckInternalCallback callback); + ~SubAckPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnSubAckInternalCallback _callback; + + uint8_t _bytePosition; + char _packetIdMsb; + uint16_t _packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.cpp new file mode 100644 index 00000000..a44943d1 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.cpp @@ -0,0 +1,30 @@ +#include "UnsubAckPacket.hpp" + +using AsyncMqttClientInternals::UnsubAckPacket; + +UnsubAckPacket::UnsubAckPacket(ParsingInformation* parsingInformation, OnUnsubAckInternalCallback callback) +: _parsingInformation(parsingInformation) +, _callback(callback) +, _bytePosition(0) +, _packetIdMsb(0) +, _packetId(0) { +} + +UnsubAckPacket::~UnsubAckPacket() { +} + +void UnsubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) { + char currentByte = data[(*currentBytePosition)++]; + if (_bytePosition++ == 0) { + _packetIdMsb = currentByte; + } else { + _packetId = currentByte | _packetIdMsb << 8; + _parsingInformation->bufferState = BufferState::NONE; + _callback(_packetId); + } +} + +void UnsubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { + (void)data; + (void)currentBytePosition; +} diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.hpp new file mode 100644 index 00000000..ab5b9c5c --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Packets/UnsubAckPacket.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "Arduino.h" +#include "Packet.hpp" +#include "../ParsingInformation.hpp" +#include "../Callbacks.hpp" + +namespace AsyncMqttClientInternals { +class UnsubAckPacket : public Packet { + public: + explicit UnsubAckPacket(ParsingInformation* parsingInformation, OnUnsubAckInternalCallback callback); + ~UnsubAckPacket(); + + void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition); + void parsePayload(char* data, size_t len, size_t* currentBytePosition); + + private: + ParsingInformation* _parsingInformation; + OnUnsubAckInternalCallback _callback; + + uint8_t _bytePosition; + char _packetIdMsb; + uint16_t _packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/ParsingInformation.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/ParsingInformation.hpp new file mode 100644 index 00000000..2d46f27f --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/ParsingInformation.hpp @@ -0,0 +1,21 @@ +#pragma once + +namespace AsyncMqttClientInternals { +enum class BufferState : uint8_t { + NONE = 0, + REMAINING_LENGTH = 2, + VARIABLE_HEADER = 3, + PAYLOAD = 4 +}; + +struct ParsingInformation { + BufferState bufferState; + + uint16_t maxTopicLength; + char* topicBuffer; + + uint8_t packetType; + uint16_t packetFlags; + uint32_t remainingLength; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Storage.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Storage.hpp new file mode 100644 index 00000000..725307b7 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient/Storage.hpp @@ -0,0 +1,13 @@ +#pragma once + +namespace AsyncMqttClientInternals { +struct PendingPubRel { + uint16_t packetId; +}; + +struct PendingAck { + uint8_t packetType; + uint8_t headerFlag; + uint16_t packetId; +}; +} // namespace AsyncMqttClientInternals diff --git a/wled00/src/dependencies/async-mqtt-client/LICENSE b/wled00/src/dependencies/async-mqtt-client/LICENSE new file mode 100644 index 00000000..a6183c68 --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Marvin Roger + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/wled00/src/dependencies/async-mqtt-client/README.md b/wled00/src/dependencies/async-mqtt-client/README.md new file mode 100644 index 00000000..7ea6db1a --- /dev/null +++ b/wled00/src/dependencies/async-mqtt-client/README.md @@ -0,0 +1,18 @@ +Async MQTT client for ESP8266 and ESP32 (Github: https://github.com/marvinroger/async-mqtt-client) +============================= + +[![Build Status](https://img.shields.io/travis/marvinroger/async-mqtt-client/master.svg?style=flat-square)](https://travis-ci.org/marvinroger/async-mqtt-client) + +An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client implementation, built on [me-no-dev/ESPAsyncTCP (ESP8266)](https://github.com/me-no-dev/ESPAsyncTCP) | [me-no-dev/AsyncTCP (ESP32)](https://github.com/me-no-dev/AsyncTCP) . +## Features + +* Compliant with the 3.1.1 version of the protocol +* Fully asynchronous +* Subscribe at QoS 0, 1 and 2 +* Publish at QoS 0, 1 and 2 +* SSL/TLS support +* Available in the [PlatformIO registry](http://platformio.org/lib/show/346/AsyncMqttClient) + +## Requirements, installation and usage + +The project is documented in the [/docs folder](docs). diff --git a/wled00/src/dependencies/pubsubclient/LICENSE.txt b/wled00/src/dependencies/pubsubclient/LICENSE.txt deleted file mode 100644 index 217df35c..00000000 --- a/wled00/src/dependencies/pubsubclient/LICENSE.txt +++ /dev/null @@ -1,20 +0,0 @@ -Copyright (c) 2008-2015 Nicholas O'Leary - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/wled00/src/dependencies/pubsubclient/PubSubClient.cpp b/wled00/src/dependencies/pubsubclient/PubSubClient.cpp deleted file mode 100644 index d3e5ca5f..00000000 --- a/wled00/src/dependencies/pubsubclient/PubSubClient.cpp +++ /dev/null @@ -1,601 +0,0 @@ -/* - PubSubClient.cpp - A simple client for MQTT. - Nick O'Leary - http://knolleary.net -*/ - -#include "PubSubClient.h" -#include "Arduino.h" - -PubSubClient::PubSubClient() { - this->_state = MQTT_DISCONNECTED; - this->_client = NULL; - this->stream = NULL; - setCallback(NULL); -} - -PubSubClient::PubSubClient(Client& client) { - this->_state = MQTT_DISCONNECTED; - setClient(client); - this->stream = NULL; -} - -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(addr, port); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(addr,port); - setClient(client); - setStream(stream); -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(addr, port); - setCallback(callback); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(addr,port); - setCallback(callback); - setClient(client); - setStream(stream); -} - -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(ip, port); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(ip,port); - setClient(client); - setStream(stream); -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(ip, port); - setCallback(callback); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(ip,port); - setCallback(callback); - setClient(client); - setStream(stream); -} - -PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setClient(client); - setStream(stream); -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setCallback(callback); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setCallback(callback); - setClient(client); - setStream(stream); -} - -boolean PubSubClient::connect(const char *id) { - return connect(id,NULL,NULL,0,0,0,0); -} - -boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { - return connect(id,user,pass,0,0,0,0); -} - -boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); -} - -boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - if (!connected()) { - int result = 0; - - if (domain != NULL) { - result = _client->connect(this->domain, this->port); - } else { - result = _client->connect(this->ip, this->port); - } - if (result == 1) { - nextMsgId = 1; - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - unsigned int j; - -#if MQTT_VERSION == MQTT_VERSION_3_1 - uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; -#define MQTT_HEADER_VERSION_LENGTH 9 -#elif MQTT_VERSION == MQTT_VERSION_3_1_1 - uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; -#define MQTT_HEADER_VERSION_LENGTH 7 -#endif - for (j = 0;j>1); - } - } - - buffer[length++] = v; - - buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = writeString(id,buffer,length); - if (willTopic) { - length = writeString(willTopic,buffer,length); - length = writeString(willMessage,buffer,length); - } - - if(user != NULL) { - length = writeString(user,buffer,length); - if(pass != NULL) { - length = writeString(pass,buffer,length); - } - } - - write(MQTTCONNECT,buffer,length-5); - - lastInActivity = lastOutActivity = millis(); - - while (!_client->available()) { - unsigned long t = millis(); - if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { - _state = MQTT_CONNECTION_TIMEOUT; - _client->stop(); - return false; - } - } - uint8_t llen; - uint16_t len = readPacket(&llen); - - if (len == 4) { - if (buffer[3] == 0) { - lastInActivity = millis(); - pingOutstanding = false; - _state = MQTT_CONNECTED; - return true; - } else { - _state = buffer[3]; - } - } - _client->stop(); - } else { - _state = MQTT_CONNECT_FAILED; - } - return false; - } - return true; -} - -// reads a byte into result -boolean PubSubClient::readByte(uint8_t * result) { - uint32_t previousMillis = millis(); - while(!_client->available()) { - uint32_t currentMillis = millis(); - if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ - return false; - } - } - *result = _client->read(); - return true; -} - -// reads a byte into result[*index] and increments index -boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ - uint16_t current_index = *index; - uint8_t * write_address = &(result[current_index]); - if(readByte(write_address)){ - *index = current_index + 1; - return true; - } - return false; -} - -uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { - uint16_t len = 0; - if(!readByte(buffer, &len)) return 0; - bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; - uint32_t multiplier = 1; - uint16_t length = 0; - uint8_t digit = 0; - uint16_t skip = 0; - uint8_t start = 0; - - do { - if (len == 6) { - // Invalid remaining length encoding - kill the connection - _state = MQTT_DISCONNECTED; - _client->stop(); - return 0; - } - if(!readByte(&digit)) return 0; - buffer[len++] = digit; - length += (digit & 127) * multiplier; - multiplier *= 128; - } while ((digit & 128) != 0); - *lengthLength = len-1; - - if (isPublish) { - // Read in topic length to calculate bytes to skip over for Stream writing - if(!readByte(buffer, &len)) return 0; - if(!readByte(buffer, &len)) return 0; - skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; - start = 2; - if (buffer[0]&MQTTQOS1) { - // skip message id - skip += 2; - } - } - - for (uint16_t i = start;istream) { - if (isPublish && len-*lengthLength-2>skip) { - this->stream->write(digit); - } - } - if (len < MQTT_MAX_PACKET_SIZE) { - buffer[len] = digit; - } - len++; - } - - if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { - len = 0; // This will cause the packet to be ignored. - } - - return len; -} - -boolean PubSubClient::loop() { - if (connected()) { - unsigned long t = millis(); - if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { - if (pingOutstanding) { - this->_state = MQTT_CONNECTION_TIMEOUT; - _client->stop(); - return false; - } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - _client->write(buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ - memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) buffer+llen+2; - - // make sure payload can be interpreted as 'C' string - buffer[(len < MQTT_MAX_PACKET_SIZE) ? len : MQTT_MAX_PACKET_SIZE -1] = 0; - - // msgId only present for QOS>0 - if ((buffer[0]&0x06) == MQTTQOS1) { - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client->write(buffer,4); - lastOutActivity = t; - - } else { - payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; - } - } else if (!connected()) { - // readPacket has closed the connection - return false; - } - } - return true; - } - return false; -} - -boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload,strlen(payload),false); -} - -boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { - return publish(topic,(const uint8_t*)payload,strlen(payload),retained); -} - -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { - return publish(topic, payload, plength, false); -} - -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { - if (connected()) { - if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { - // Too long - return false; - } - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - length = writeString(topic,buffer,length); - uint16_t i; - for (i=0;i 0) { - digit |= 0x80; - } - buffer[pos++] = digit; - llen++; - } while(len>0); - - pos = writeString(topic,buffer,pos); - - rc += _client->write(buffer,pos); - - for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); - } - - lastOutActivity = millis(); - - return rc == tlen + 4 + plength; -} - -boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { - uint8_t lenBuf[4]; - uint8_t llen = 0; - uint8_t digit; - uint8_t pos = 0; - uint16_t rc; - uint16_t len = length; - do { - digit = len % 128; - len = len / 128; - if (len > 0) { - digit |= 0x80; - } - lenBuf[pos++] = digit; - llen++; - } while(len>0); - - buf[4-llen] = header; - for (int i=0;i 0) && result) { - bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining; - rc = _client->write(writeBuf,bytesToWrite); - result = (rc == bytesToWrite); - bytesRemaining -= rc; - writeBuf += rc; - } - return result; -#else - rc = _client->write(buf+(4-llen),length+1+llen); - lastOutActivity = millis(); - return (rc == 1+llen+length); -#endif -} - -boolean PubSubClient::subscribe(const char* topic) { - return subscribe(topic, 0); -} - -boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - if (qos > 1) { - return false; - } - if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { - // Too long - return false; - } - if (connected()) { - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, buffer,length); - buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); - } - return false; -} - -boolean PubSubClient::unsubscribe(const char* topic) { - if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { - // Too long - return false; - } - if (connected()) { - uint16_t length = 5; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); - } - return false; -} - -void PubSubClient::disconnect() { - buffer[0] = MQTTDISCONNECT; - buffer[1] = 0; - _client->write(buffer,2); - _state = MQTT_DISCONNECTED; - _client->stop(); - lastInActivity = lastOutActivity = millis(); -} - -uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { - const char* idp = string; - uint16_t i = 0; - pos += 2; - while (*idp) { - buf[pos++] = *idp++; - i++; - } - buf[pos-i-2] = (i >> 8); - buf[pos-i-1] = (i & 0xFF); - return pos; -} - - -boolean PubSubClient::connected() { - boolean rc; - if (_client == NULL ) { - rc = false; - } else { - rc = (int)_client->connected(); - if (!rc) { - if (this->_state == MQTT_CONNECTED) { - this->_state = MQTT_CONNECTION_LOST; - _client->flush(); - _client->stop(); - } - } - } - return rc; -} - -PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { - IPAddress addr(ip[0],ip[1],ip[2],ip[3]); - return setServer(addr,port); -} - -PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { - this->ip = ip; - this->port = port; - this->domain = NULL; - return *this; -} - -PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { - this->domain = domain; - this->port = port; - return *this; -} - -PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { - this->callback = callback; - return *this; -} - -PubSubClient& PubSubClient::setClient(Client& client){ - this->_client = &client; - return *this; -} - -PubSubClient& PubSubClient::setStream(Stream& stream){ - this->stream = &stream; - return *this; -} - -int PubSubClient::state() { - return this->_state; -} diff --git a/wled00/src/dependencies/pubsubclient/PubSubClient.h b/wled00/src/dependencies/pubsubclient/PubSubClient.h deleted file mode 100644 index 9eaa3296..00000000 --- a/wled00/src/dependencies/pubsubclient/PubSubClient.h +++ /dev/null @@ -1,144 +0,0 @@ -/* - PubSubClient.h - A simple client for MQTT. - Nick O'Leary - http://knolleary.net -*/ - -#ifndef PubSubClient_h -#define PubSubClient_h - -#include -#include "IPAddress.h" -#include "Client.h" -#include "Stream.h" - -#define MQTT_VERSION_3_1 3 -#define MQTT_VERSION_3_1_1 4 - -// MQTT_VERSION : Pick the version -//#define MQTT_VERSION MQTT_VERSION_3_1 -#ifndef MQTT_VERSION -#define MQTT_VERSION MQTT_VERSION_3_1_1 -#endif - -// MQTT_MAX_PACKET_SIZE : Maximum packet size -#ifndef MQTT_MAX_PACKET_SIZE -#define MQTT_MAX_PACKET_SIZE 128 -#endif - -// MQTT_KEEPALIVE : keepAlive interval in Seconds -#ifndef MQTT_KEEPALIVE -#define MQTT_KEEPALIVE 60 -#endif - -// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds -#ifndef MQTT_SOCKET_TIMEOUT -#define MQTT_SOCKET_TIMEOUT 62 -#endif - -// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client -// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to -// pass the entire MQTT packet in each write call. -//#define MQTT_MAX_TRANSFER_SIZE 80 - -// Possible values for client.state() -#define MQTT_CONNECTION_TIMEOUT -4 -#define MQTT_CONNECTION_LOST -3 -#define MQTT_CONNECT_FAILED -2 -#define MQTT_DISCONNECTED -1 -#define MQTT_CONNECTED 0 -#define MQTT_CONNECT_BAD_PROTOCOL 1 -#define MQTT_CONNECT_BAD_CLIENT_ID 2 -#define MQTT_CONNECT_UNAVAILABLE 3 -#define MQTT_CONNECT_BAD_CREDENTIALS 4 -#define MQTT_CONNECT_UNAUTHORIZED 5 - -#define MQTTCONNECT 1 << 4 // Client request to connect to Server -#define MQTTCONNACK 2 << 4 // Connect Acknowledgment -#define MQTTPUBLISH 3 << 4 // Publish message -#define MQTTPUBACK 4 << 4 // Publish Acknowledgment -#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) -#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) -#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) -#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request -#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment -#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request -#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment -#define MQTTPINGREQ 12 << 4 // PING Request -#define MQTTPINGRESP 13 << 4 // PING Response -#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting -#define MQTTReserved 15 << 4 // Reserved - -#define MQTTQOS0 (0 << 1) -#define MQTTQOS1 (1 << 1) -#define MQTTQOS2 (2 << 1) - -#ifdef ESP8266 -#include -#define MQTT_CALLBACK_SIGNATURE std::function callback -#else -#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) -#endif - -class PubSubClient { -private: - Client* _client; - uint8_t buffer[MQTT_MAX_PACKET_SIZE]; - uint16_t nextMsgId; - unsigned long lastOutActivity; - unsigned long lastInActivity; - bool pingOutstanding; - MQTT_CALLBACK_SIGNATURE; - uint16_t readPacket(uint8_t*); - boolean readByte(uint8_t * result); - boolean readByte(uint8_t * result, uint16_t * index); - boolean write(uint8_t header, uint8_t* buf, uint16_t length); - uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); - IPAddress ip; - const char* domain; - uint16_t port; - Stream* stream; - int _state; -public: - PubSubClient(); - PubSubClient(Client& client); - PubSubClient(IPAddress, uint16_t, Client& client); - PubSubClient(IPAddress, uint16_t, Client& client, Stream&); - PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); - PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - PubSubClient(uint8_t *, uint16_t, Client& client); - PubSubClient(uint8_t *, uint16_t, Client& client, Stream&); - PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); - PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - PubSubClient(const char*, uint16_t, Client& client); - PubSubClient(const char*, uint16_t, Client& client, Stream&); - PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); - PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - - PubSubClient& setServer(IPAddress ip, uint16_t port); - PubSubClient& setServer(uint8_t * ip, uint16_t port); - PubSubClient& setServer(const char * domain, uint16_t port); - PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); - PubSubClient& setClient(Client& client); - PubSubClient& setStream(Stream& stream); - - boolean connect(const char* id); - boolean connect(const char* id, const char* user, const char* pass); - boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); - boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); - void disconnect(); - boolean publish(const char* topic, const char* payload); - boolean publish(const char* topic, const char* payload, boolean retained); - boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); - boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); - boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); - boolean subscribe(const char* topic); - boolean subscribe(const char* topic, uint8_t qos); - boolean unsubscribe(const char* topic); - boolean loop(); - boolean connected(); - int state(); -}; - - -#endif diff --git a/wled00/wled00.ino b/wled00/wled00.ino index 2ce6af43..d0cfb708 100644 --- a/wled00/wled00.ino +++ b/wled00/wled00.ino @@ -70,7 +70,7 @@ #include "src/dependencies/blynk/BlynkSimpleEsp.h" #endif #include "src/dependencies/e131/E131.h" -#include "src/dependencies/pubsubclient/PubSubClient.h" +#include "src/dependencies/async-mqtt-client/AsyncMqttClient.h" #include "html_classic.h" #include "html_mobile.h" #include "html_settings.h" @@ -80,7 +80,7 @@ //version code in format yymmddb (b = daily build) -#define VERSION 1902172 +#define VERSION 1902173 char versionString[] = "0.8.4-dev"; @@ -359,7 +359,6 @@ IPAddress realtimeIP = (0,0,0,0); unsigned long realtimeTimeout = 0; //mqtt -bool mqttInit = false; long lastMQTTReconnectAttempt = 0; long lastInterfaceUpdate = 0; byte interfaceUpdateCallMode = 0; @@ -403,8 +402,7 @@ bool doReboot = false; //flag to initiate reboot from async handlers //server library objects AsyncWebServer server(80); HTTPClient* hueClient = NULL; -WiFiClient* mqttTCPClient = NULL; -PubSubClient* mqtt = NULL; +AsyncMqttClient* mqtt = NULL; #ifndef WLED_DISABLE_OTA //ESP8266HTTPUpdateServer httpUpdater; @@ -511,11 +509,7 @@ void loop() { handleButton(); handleIR(); handleNetworkTime(); - if (!onlyAP) - { - handleAlexa(); - handleMQTT(); - } + if (!onlyAP) handleAlexa(); handleOverlays(); diff --git a/wled00/wled05_init.ino b/wled00/wled05_init.ino index 1a786d6a..08904430 100644 --- a/wled00/wled05_init.ino +++ b/wled00/wled05_init.ino @@ -72,9 +72,8 @@ void wledInit() //smartInit, we only init some resources when connected if (!onlyAP && WiFi.status() == WL_CONNECTED) { - mqttTCPClient = new WiFiClient(); - mqtt = new PubSubClient(*mqttTCPClient); - mqttInit = initMQTT(); + mqtt = new AsyncMqttClient(); + initMqtt(); } strip.service(); diff --git a/wled00/wled08_led.ino b/wled00/wled08_led.ino index 3de46df6..81ad90e3 100644 --- a/wled00/wled08_led.ino +++ b/wled00/wled08_led.ino @@ -151,7 +151,7 @@ void colorUpdated(int callMode) void updateInterfaces(uint8_t callMode) { if (callMode != 9 && callMode != 5) updateBlynk(); - publishMQTT(); + publishMqtt(); lastInterfaceUpdate = millis(); } diff --git a/wled00/wled17_mqtt.ino b/wled00/wled17_mqtt.ino index 6502250e..14bdf5f7 100644 --- a/wled00/wled17_mqtt.ino +++ b/wled00/wled17_mqtt.ino @@ -17,11 +17,42 @@ void parseMQTTBriPayload(char* payload) } -void callbackMQTT(char* topic, byte* payload, unsigned int length) { +void onMqttConnect(bool sessionPresent) +{ + //(re)subscribe to required topics + char subuf[38]; + strcpy(subuf, mqttDeviceTopic); + + if (mqttDeviceTopic[0] != 0) + { + strcpy(subuf, mqttDeviceTopic); + mqtt->subscribe(subuf, 0); + strcat(subuf, "/col"); + mqtt->subscribe(subuf, 0); + strcpy(subuf, mqttDeviceTopic); + strcat(subuf, "/api"); + mqtt->subscribe(subuf, 0); + } + + if (mqttGroupTopic[0] != 0) + { + strcpy(subuf, mqttGroupTopic); + mqtt->subscribe(subuf, 0); + strcat(subuf, "/col"); + mqtt->subscribe(subuf, 0); + strcpy(subuf, mqttGroupTopic); + strcat(subuf, "/api"); + mqtt->subscribe(subuf, 0); + } + publishMqtt(); +} + + +void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { DEBUG_PRINT("MQTT callb rec: "); DEBUG_PRINTLN(topic); - DEBUG_PRINTLN((char*)payload); + DEBUG_PRINTLN(payload); //no need to check the topic because we only get topics we are subscribed to @@ -34,14 +65,11 @@ void callbackMQTT(char* topic, byte* payload, unsigned int length) { String apireq = "win&"; apireq += (char*)payload; handleSet(nullptr, apireq); - } else - { - parseMQTTBriPayload((char*)payload); - } + } else parseMQTTBriPayload(payload); } -void publishMQTT() +void publishMqtt() { if (mqtt == NULL) return; if (!mqtt->connected()) return; @@ -53,59 +81,21 @@ void publishMQTT() sprintf(s, "%ld", bri); strcpy(subuf, mqttDeviceTopic); strcat(subuf, "/g"); - mqtt->publish(subuf, s); + mqtt->publish(subuf, 0, true, s); sprintf(s, "#%X", col[3]*16777216 + col[0]*65536 + col[1]*256 + col[2]); strcpy(subuf, mqttDeviceTopic); strcat(subuf, "/c"); - mqtt->publish(subuf, s); + mqtt->publish(subuf, 0, true, s); - //if you want to use this, increase the MQTT buffer in PubSubClient.h to 350+ - //it will publish the API response to MQTT - /*XML_response(nullptr, false); + XML_response(nullptr, false); strcpy(subuf, mqttDeviceTopic); strcat(subuf, "/v"); - mqtt->publish(subuf, obuf);*/ + mqtt->publish(subuf, 0, true, obuf); } -bool reconnectMQTT() -{ - if (mqtt->connect(escapedMac.c_str())) - { - //re-subscribe to required topics - char subuf[38]; - strcpy(subuf, mqttDeviceTopic); - - if (mqttDeviceTopic[0] != 0) - { - strcpy(subuf, mqttDeviceTopic); - mqtt->subscribe(subuf); - strcat(subuf, "/col"); - mqtt->subscribe(subuf); - strcpy(subuf, mqttDeviceTopic); - strcat(subuf, "/api"); - mqtt->subscribe(subuf); - } - - if (mqttGroupTopic[0] != 0) - { - strcpy(subuf, mqttGroupTopic); - mqtt->subscribe(subuf); - strcat(subuf, "/col"); - mqtt->subscribe(subuf); - strcpy(subuf, mqttGroupTopic); - strcat(subuf, "/api"); - mqtt->subscribe(subuf); - } - - publishMQTT(); - } - return mqtt->connected(); -} - - -bool initMQTT() +bool initMqtt() { if (WiFi.status() != WL_CONNECTED) return false; if (mqttServer[0] == 0) return false; @@ -117,29 +107,10 @@ bool initMQTT() } else { mqtt->setServer(mqttServer, WLED_MQTT_PORT); } - mqtt->setCallback(callbackMQTT); + mqtt->setClientId(escapedMac.c_str()); + mqtt->onMessage(onMqttMessage); + mqtt->onConnect(onMqttConnect); + mqtt->connect(); DEBUG_PRINTLN("MQTT ready."); return true; } - - -void handleMQTT() -{ - if (WiFi.status() != WL_CONNECTED || !mqttInit) return; - - //every time connection is unsuccessful, the attempt interval is increased, since attempt will block program for 7 sec each time - if (!mqtt->connected() && millis() - lastMQTTReconnectAttempt > 5000 + (5000 * mqttFailedConAttempts * mqttFailedConAttempts)) - { - DEBUG_PRINTLN("Attempting to connect MQTT..."); - lastMQTTReconnectAttempt = millis(); - if (!reconnectMQTT()) - { - //still attempt reconnect about once daily - if (mqttFailedConAttempts < 120) mqttFailedConAttempts++; - return; - } - DEBUG_PRINTLN("MQTT con!"); - mqttFailedConAttempts = 0; - } - mqtt->loop(); -} diff --git a/wled00/wled18_server.ino b/wled00/wled18_server.ino index 5134402a..72c33924 100644 --- a/wled00/wled18_server.ino +++ b/wled00/wled18_server.ino @@ -134,7 +134,7 @@ void initServer() //init ota page #ifndef WLED_DISABLE_OTA server.on("/update", HTTP_GET, [](AsyncWebServerRequest *request){ - serveMessage(request, 200, "WLED Software Update", "Your installed version: " + String(versionString) + "
Download the latest binary: " + serveMessage(request, 200, "WLED Software Update", "Installed version: " + String(versionString) + "
Download the latest binary: " "" "
" "
", 254);