diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp index b3eff719..f62e1ef4 100644 --- a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp @@ -38,6 +38,7 @@ AsyncMqttClient::AsyncMqttClient() #ifdef ESP32 sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac()); + _xSemaphore = xSemaphoreCreateMutex(); #elif defined(ESP8266) sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId()); #endif @@ -49,6 +50,9 @@ AsyncMqttClient::AsyncMqttClient() AsyncMqttClient::~AsyncMqttClient() { delete _currentParsedPacket; delete[] _parsingInformation.topicBuffer; +#ifdef ESP32 + vSemaphoreDelete(_xSemaphore); +#endif } AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { @@ -298,9 +302,11 @@ void AsyncMqttClient::_onConnect(AsyncClient* client) { neededSpace += passwordLength; } + SEMAPHORE_TAKE(); if (_client.space() < neededSpace) { _connectPacketNotEnoughSpace = true; _client.close(true); + SEMAPHORE_GIVE(); return; } @@ -329,26 +335,24 @@ void AsyncMqttClient::_onConnect(AsyncClient* client) { } _client.send(); _lastClientActivity = millis(); + SEMAPHORE_GIVE(); } void AsyncMqttClient::_onDisconnect(AsyncClient* client) { (void)client; - AsyncMqttClientDisconnectReason reason; + if (!_disconnectFlagged) { + AsyncMqttClientDisconnectReason reason; - if (_connectPacketNotEnoughSpace) { - reason = AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE; - } else if (_tlsBadFingerprint) { - reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; - } else { - reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; + if (_connectPacketNotEnoughSpace) { + reason = AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE; + } else if (_tlsBadFingerprint) { + reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; + } else { + reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; + } + for (auto callback : _onDisconnectUserCallbacks) callback(reason); } - _clear(); - - for (auto callback : _onDisconnectUserCallbacks) callback(reason); - - _connectPacketNotEnoughSpace = false; - _tlsBadFingerprint = false; } void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) { @@ -481,8 +485,8 @@ void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) _connected = true; for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); } else { - _clear(); for (auto callback : _onDisconnectUserCallbacks) callback(static_cast(connectReturnCode)); + _disconnectFlagged = true; } } @@ -606,19 +610,22 @@ bool AsyncMqttClient::_sendPing() { size_t neededSpace = 2; - if (_client.space() < neededSpace) return false; + SEMAPHORE_TAKE(false); + if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return false; } _client.add(fixedHeader, 2); _client.send(); _lastClientActivity = millis(); _lastPingRequestTime = millis(); + SEMAPHORE_GIVE(); return true; } void AsyncMqttClient::_sendAcks() { uint8_t neededAckSpace = 2 + 2; + SEMAPHORE_TAKE(); for (size_t i = 0; i < _toSendAcks.size(); i++) { if (_client.space() < neededAckSpace) break; @@ -643,12 +650,17 @@ void AsyncMqttClient::_sendAcks() { _lastClientActivity = millis(); } + SEMAPHORE_GIVE(); } bool AsyncMqttClient::_sendDisconnect() { + if (!_connected) return true; + const uint8_t neededSpace = 2; - if (_client.space() < neededSpace) return false; + SEMAPHORE_TAKE(false); + + if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return false; } char fixedHeader[2]; fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT; @@ -662,6 +674,7 @@ bool AsyncMqttClient::_sendDisconnect() { _disconnectFlagged = false; + SEMAPHORE_GIVE(); return true; } @@ -704,7 +717,6 @@ void AsyncMqttClient::disconnect(bool force) { } else { _disconnectFlagged = true; _sendDisconnect(); - _client.send(); } } @@ -732,7 +744,9 @@ uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { neededSpace += 2; neededSpace += topicLength; neededSpace += 1; - if (_client.space() < neededSpace) return 0; + + SEMAPHORE_TAKE(0); + if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; } uint16_t packetId = _getNextPacketId(); char packetIdBytes[2]; @@ -747,6 +761,7 @@ uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { _client.send(); _lastClientActivity = millis(); + SEMAPHORE_GIVE(); return packetId; } @@ -770,7 +785,9 @@ uint16_t AsyncMqttClient::unsubscribe(const char* topic) { neededSpace += 2; neededSpace += 2; neededSpace += topicLength; - if (_client.space() < neededSpace) return 0; + + SEMAPHORE_TAKE(0); + if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; } uint16_t packetId = _getNextPacketId(); char packetIdBytes[2]; @@ -784,6 +801,7 @@ uint16_t AsyncMqttClient::unsubscribe(const char* topic) { _client.send(); _lastClientActivity = millis(); + SEMAPHORE_GIVE(); return packetId; } @@ -825,7 +843,9 @@ uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, c neededSpace += topicLength; if (qos != 0) neededSpace += 2; if (payload != nullptr) neededSpace += payloadLength; - if (_client.space() < neededSpace) return 0; + + SEMAPHORE_TAKE(0); + if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; } uint16_t packetId = 0; char packetIdBytes[2]; @@ -848,6 +868,7 @@ uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, c _client.send(); _lastClientActivity = millis(); + SEMAPHORE_GIVE(); if (qos != 0) { return packetId; } else { diff --git a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp index f4191f90..af8332b2 100644 --- a/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp +++ b/wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.hpp @@ -7,6 +7,7 @@ #ifdef ESP32 #include +#include #elif defined(ESP8266) #include #else @@ -37,6 +38,14 @@ #include "AsyncMqttClient/Packets/PubRecPacket.hpp" #include "AsyncMqttClient/Packets/PubCompPacket.hpp" +#if ESP32 +#define SEMAPHORE_TAKE(X) if (xSemaphoreTake(_xSemaphore, 1000 / portTICK_PERIOD_MS) != pdTRUE) { return X; } // Waits max 1000ms +#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore); +#elif defined(ESP8266) +#define SEMAPHORE_TAKE(X) void() +#define SEMAPHORE_GIVE() void() +#endif + class AsyncMqttClient { public: AsyncMqttClient(); @@ -121,6 +130,10 @@ class AsyncMqttClient { std::vector _toSendAcks; +#ifdef ESP32 + SemaphoreHandle_t _xSemaphore = nullptr; +#endif + void _clear(); void _freeCurrentParsedPacket(); diff --git a/wled00/wled00.ino b/wled00/wled00.ino index e66e8c67..114dd2c2 100644 --- a/wled00/wled00.ino +++ b/wled00/wled00.ino @@ -99,7 +99,7 @@ //version code in format yymmddb (b = daily build) -#define VERSION 1910182 +#define VERSION 1910201 char versionString[] = "0.8.5"; @@ -449,6 +449,7 @@ WS2812FX strip = WS2812FX(); unsigned long debugTime = 0; int lastWifiState = 3; unsigned long wifiStateChangedTime = 0; + int loops = 0; #else #define DEBUG_PRINT(x) #define DEBUG_PRINTLN(x) @@ -563,7 +564,10 @@ void loop() { DEBUG_PRINT("State time: "); DEBUG_PRINTLN(wifiStateChangedTime); DEBUG_PRINT("NTP last sync: "); DEBUG_PRINTLN(ntpLastSyncTime); DEBUG_PRINT("Client IP: "); DEBUG_PRINTLN(WiFi.localIP()); + DEBUG_PRINT("Loops/sec: "); DEBUG_PRINTLN(loops/10); + loops = 0; debugTime = millis(); } + loops++; #endif } diff --git a/wled00/wled05_init.ino b/wled00/wled05_init.ino index 6b2370b9..70ac7d7e 100644 --- a/wled00/wled05_init.ino +++ b/wled00/wled05_init.ino @@ -188,13 +188,15 @@ void initConnection() lastReconnectAttempt = millis(); - if (apAlwaysOn) - { - initAP(); - } else if (!apActive) - { - DEBUG_PRINTLN("Access point disabled."); - WiFi.softAPdisconnect(true); + if (!apActive) { + if (apAlwaysOn) + { + initAP(); + } else + { + DEBUG_PRINTLN("Access point disabled."); + WiFi.softAPdisconnect(true); + } } if (!WLED_WIFI_CONFIGURED) diff --git a/wled00/wled17_mqtt.ino b/wled00/wled17_mqtt.ino index 2dc361df..7672508e 100644 --- a/wled00/wled17_mqtt.ino +++ b/wled00/wled17_mqtt.ino @@ -215,7 +215,11 @@ bool initMqtt() lastMqttReconnectAttempt = millis(); if (mqttServer[0] == 0 || !WLED_CONNECTED) return false; - if (mqtt == nullptr) mqtt = new AsyncMqttClient(); + if (mqtt == nullptr) { + mqtt = new AsyncMqttClient(); + mqtt->onMessage(onMqttMessage); + mqtt->onConnect(onMqttConnect); + } if (mqtt->connected()) return true; DEBUG_PRINTLN("Reconnecting MQTT"); @@ -228,8 +232,6 @@ bool initMqtt() } mqtt->setClientId(mqttClientID); if (mqttUser[0] && mqttPass[0]) mqtt->setCredentials(mqttUser, mqttPass); - mqtt->onMessage(onMqttMessage); - mqtt->onConnect(onMqttConnect); mqtt->connect(); return true; }