Update MQTT library

This commit is contained in:
cschwinne 2019-10-20 12:48:29 +02:00
parent be185b46a7
commit 0d3a8ce31b
5 changed files with 73 additions and 31 deletions

View File

@ -38,6 +38,7 @@ AsyncMqttClient::AsyncMqttClient()
#ifdef ESP32 #ifdef ESP32
sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac()); sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac());
_xSemaphore = xSemaphoreCreateMutex();
#elif defined(ESP8266) #elif defined(ESP8266)
sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId()); sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId());
#endif #endif
@ -49,6 +50,9 @@ AsyncMqttClient::AsyncMqttClient()
AsyncMqttClient::~AsyncMqttClient() { AsyncMqttClient::~AsyncMqttClient() {
delete _currentParsedPacket; delete _currentParsedPacket;
delete[] _parsingInformation.topicBuffer; delete[] _parsingInformation.topicBuffer;
#ifdef ESP32
vSemaphoreDelete(_xSemaphore);
#endif
} }
AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) {
@ -298,9 +302,11 @@ void AsyncMqttClient::_onConnect(AsyncClient* client) {
neededSpace += passwordLength; neededSpace += passwordLength;
} }
SEMAPHORE_TAKE();
if (_client.space() < neededSpace) { if (_client.space() < neededSpace) {
_connectPacketNotEnoughSpace = true; _connectPacketNotEnoughSpace = true;
_client.close(true); _client.close(true);
SEMAPHORE_GIVE();
return; return;
} }
@ -329,10 +335,12 @@ void AsyncMqttClient::_onConnect(AsyncClient* client) {
} }
_client.send(); _client.send();
_lastClientActivity = millis(); _lastClientActivity = millis();
SEMAPHORE_GIVE();
} }
void AsyncMqttClient::_onDisconnect(AsyncClient* client) { void AsyncMqttClient::_onDisconnect(AsyncClient* client) {
(void)client; (void)client;
if (!_disconnectFlagged) {
AsyncMqttClientDisconnectReason reason; AsyncMqttClientDisconnectReason reason;
if (_connectPacketNotEnoughSpace) { if (_connectPacketNotEnoughSpace) {
@ -342,13 +350,9 @@ void AsyncMqttClient::_onDisconnect(AsyncClient* client) {
} else { } else {
reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED;
} }
_clear();
for (auto callback : _onDisconnectUserCallbacks) callback(reason); for (auto callback : _onDisconnectUserCallbacks) callback(reason);
}
_connectPacketNotEnoughSpace = false; _clear();
_tlsBadFingerprint = false;
} }
void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) { void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) {
@ -481,8 +485,8 @@ void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode)
_connected = true; _connected = true;
for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); for (auto callback : _onConnectUserCallbacks) callback(sessionPresent);
} else { } else {
_clear();
for (auto callback : _onDisconnectUserCallbacks) callback(static_cast<AsyncMqttClientDisconnectReason>(connectReturnCode)); for (auto callback : _onDisconnectUserCallbacks) callback(static_cast<AsyncMqttClientDisconnectReason>(connectReturnCode));
_disconnectFlagged = true;
} }
} }
@ -606,19 +610,22 @@ bool AsyncMqttClient::_sendPing() {
size_t neededSpace = 2; size_t neededSpace = 2;
if (_client.space() < neededSpace) return false; SEMAPHORE_TAKE(false);
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return false; }
_client.add(fixedHeader, 2); _client.add(fixedHeader, 2);
_client.send(); _client.send();
_lastClientActivity = millis(); _lastClientActivity = millis();
_lastPingRequestTime = millis(); _lastPingRequestTime = millis();
SEMAPHORE_GIVE();
return true; return true;
} }
void AsyncMqttClient::_sendAcks() { void AsyncMqttClient::_sendAcks() {
uint8_t neededAckSpace = 2 + 2; uint8_t neededAckSpace = 2 + 2;
SEMAPHORE_TAKE();
for (size_t i = 0; i < _toSendAcks.size(); i++) { for (size_t i = 0; i < _toSendAcks.size(); i++) {
if (_client.space() < neededAckSpace) break; if (_client.space() < neededAckSpace) break;
@ -643,12 +650,17 @@ void AsyncMqttClient::_sendAcks() {
_lastClientActivity = millis(); _lastClientActivity = millis();
} }
SEMAPHORE_GIVE();
} }
bool AsyncMqttClient::_sendDisconnect() { bool AsyncMqttClient::_sendDisconnect() {
if (!_connected) return true;
const uint8_t neededSpace = 2; const uint8_t neededSpace = 2;
if (_client.space() < neededSpace) return false; SEMAPHORE_TAKE(false);
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return false; }
char fixedHeader[2]; char fixedHeader[2];
fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT; fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT;
@ -662,6 +674,7 @@ bool AsyncMqttClient::_sendDisconnect() {
_disconnectFlagged = false; _disconnectFlagged = false;
SEMAPHORE_GIVE();
return true; return true;
} }
@ -704,7 +717,6 @@ void AsyncMqttClient::disconnect(bool force) {
} else { } else {
_disconnectFlagged = true; _disconnectFlagged = true;
_sendDisconnect(); _sendDisconnect();
_client.send();
} }
} }
@ -732,7 +744,9 @@ uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
neededSpace += 2; neededSpace += 2;
neededSpace += topicLength; neededSpace += topicLength;
neededSpace += 1; neededSpace += 1;
if (_client.space() < neededSpace) return 0;
SEMAPHORE_TAKE(0);
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; }
uint16_t packetId = _getNextPacketId(); uint16_t packetId = _getNextPacketId();
char packetIdBytes[2]; char packetIdBytes[2];
@ -747,6 +761,7 @@ uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
_client.send(); _client.send();
_lastClientActivity = millis(); _lastClientActivity = millis();
SEMAPHORE_GIVE();
return packetId; return packetId;
} }
@ -770,7 +785,9 @@ uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
neededSpace += 2; neededSpace += 2;
neededSpace += 2; neededSpace += 2;
neededSpace += topicLength; neededSpace += topicLength;
if (_client.space() < neededSpace) return 0;
SEMAPHORE_TAKE(0);
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; }
uint16_t packetId = _getNextPacketId(); uint16_t packetId = _getNextPacketId();
char packetIdBytes[2]; char packetIdBytes[2];
@ -784,6 +801,7 @@ uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
_client.send(); _client.send();
_lastClientActivity = millis(); _lastClientActivity = millis();
SEMAPHORE_GIVE();
return packetId; return packetId;
} }
@ -825,7 +843,9 @@ uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, c
neededSpace += topicLength; neededSpace += topicLength;
if (qos != 0) neededSpace += 2; if (qos != 0) neededSpace += 2;
if (payload != nullptr) neededSpace += payloadLength; if (payload != nullptr) neededSpace += payloadLength;
if (_client.space() < neededSpace) return 0;
SEMAPHORE_TAKE(0);
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; }
uint16_t packetId = 0; uint16_t packetId = 0;
char packetIdBytes[2]; char packetIdBytes[2];
@ -848,6 +868,7 @@ uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, c
_client.send(); _client.send();
_lastClientActivity = millis(); _lastClientActivity = millis();
SEMAPHORE_GIVE();
if (qos != 0) { if (qos != 0) {
return packetId; return packetId;
} else { } else {

View File

@ -7,6 +7,7 @@
#ifdef ESP32 #ifdef ESP32
#include <AsyncTCP.h> #include <AsyncTCP.h>
#include <freertos/semphr.h>
#elif defined(ESP8266) #elif defined(ESP8266)
#include <ESPAsyncTCP.h> #include <ESPAsyncTCP.h>
#else #else
@ -37,6 +38,14 @@
#include "AsyncMqttClient/Packets/PubRecPacket.hpp" #include "AsyncMqttClient/Packets/PubRecPacket.hpp"
#include "AsyncMqttClient/Packets/PubCompPacket.hpp" #include "AsyncMqttClient/Packets/PubCompPacket.hpp"
#if ESP32
#define SEMAPHORE_TAKE(X) if (xSemaphoreTake(_xSemaphore, 1000 / portTICK_PERIOD_MS) != pdTRUE) { return X; } // Waits max 1000ms
#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore);
#elif defined(ESP8266)
#define SEMAPHORE_TAKE(X) void()
#define SEMAPHORE_GIVE() void()
#endif
class AsyncMqttClient { class AsyncMqttClient {
public: public:
AsyncMqttClient(); AsyncMqttClient();
@ -121,6 +130,10 @@ class AsyncMqttClient {
std::vector<AsyncMqttClientInternals::PendingAck> _toSendAcks; std::vector<AsyncMqttClientInternals::PendingAck> _toSendAcks;
#ifdef ESP32
SemaphoreHandle_t _xSemaphore = nullptr;
#endif
void _clear(); void _clear();
void _freeCurrentParsedPacket(); void _freeCurrentParsedPacket();

View File

@ -99,7 +99,7 @@
//version code in format yymmddb (b = daily build) //version code in format yymmddb (b = daily build)
#define VERSION 1910182 #define VERSION 1910201
char versionString[] = "0.8.5"; char versionString[] = "0.8.5";
@ -449,6 +449,7 @@ WS2812FX strip = WS2812FX();
unsigned long debugTime = 0; unsigned long debugTime = 0;
int lastWifiState = 3; int lastWifiState = 3;
unsigned long wifiStateChangedTime = 0; unsigned long wifiStateChangedTime = 0;
int loops = 0;
#else #else
#define DEBUG_PRINT(x) #define DEBUG_PRINT(x)
#define DEBUG_PRINTLN(x) #define DEBUG_PRINTLN(x)
@ -563,7 +564,10 @@ void loop() {
DEBUG_PRINT("State time: "); DEBUG_PRINTLN(wifiStateChangedTime); DEBUG_PRINT("State time: "); DEBUG_PRINTLN(wifiStateChangedTime);
DEBUG_PRINT("NTP last sync: "); DEBUG_PRINTLN(ntpLastSyncTime); DEBUG_PRINT("NTP last sync: "); DEBUG_PRINTLN(ntpLastSyncTime);
DEBUG_PRINT("Client IP: "); DEBUG_PRINTLN(WiFi.localIP()); DEBUG_PRINT("Client IP: "); DEBUG_PRINTLN(WiFi.localIP());
DEBUG_PRINT("Loops/sec: "); DEBUG_PRINTLN(loops/10);
loops = 0;
debugTime = millis(); debugTime = millis();
} }
loops++;
#endif #endif
} }

View File

@ -188,14 +188,16 @@ void initConnection()
lastReconnectAttempt = millis(); lastReconnectAttempt = millis();
if (!apActive) {
if (apAlwaysOn) if (apAlwaysOn)
{ {
initAP(); initAP();
} else if (!apActive) } else
{ {
DEBUG_PRINTLN("Access point disabled."); DEBUG_PRINTLN("Access point disabled.");
WiFi.softAPdisconnect(true); WiFi.softAPdisconnect(true);
} }
}
if (!WLED_WIFI_CONFIGURED) if (!WLED_WIFI_CONFIGURED)
{ {

View File

@ -215,7 +215,11 @@ bool initMqtt()
lastMqttReconnectAttempt = millis(); lastMqttReconnectAttempt = millis();
if (mqttServer[0] == 0 || !WLED_CONNECTED) return false; if (mqttServer[0] == 0 || !WLED_CONNECTED) return false;
if (mqtt == nullptr) mqtt = new AsyncMqttClient(); if (mqtt == nullptr) {
mqtt = new AsyncMqttClient();
mqtt->onMessage(onMqttMessage);
mqtt->onConnect(onMqttConnect);
}
if (mqtt->connected()) return true; if (mqtt->connected()) return true;
DEBUG_PRINTLN("Reconnecting MQTT"); DEBUG_PRINTLN("Reconnecting MQTT");
@ -228,8 +232,6 @@ bool initMqtt()
} }
mqtt->setClientId(mqttClientID); mqtt->setClientId(mqttClientID);
if (mqttUser[0] && mqttPass[0]) mqtt->setCredentials(mqttUser, mqttPass); if (mqttUser[0] && mqttPass[0]) mqtt->setCredentials(mqttUser, mqttPass);
mqtt->onMessage(onMqttMessage);
mqtt->onConnect(onMqttConnect);
mqtt->connect(); mqtt->connect();
return true; return true;
} }