improve mqtt disconnect/reconnect

pull/49/head
Thorsten von Eicken 9 years ago
parent 658c4d8b31
commit 82c65718cf
  1. 12
      esp-link/cgi.h
  2. 17
      esp-link/cgimqtt.c
  3. 5
      esp-link/config.c
  4. 65
      esp-link/mqtt_client.c
  5. 10
      html/mqtt.html
  6. 192
      mqtt/mqtt.c
  7. 6
      mqtt/mqtt.h

@ -6,9 +6,17 @@
void jsonHeader(HttpdConnData *connData, int code); void jsonHeader(HttpdConnData *connData, int code);
void errorResponse(HttpdConnData *connData, int code, char *message); void errorResponse(HttpdConnData *connData, int code, char *message);
int getStringArg(HttpdConnData *connData, char *name, char *config, int max_len);
int getBoolArg(HttpdConnData *connData, char *name, bool*config); // Get the HTTP query-string param 'name' and store it at 'config' with max length
// 'max_len' (incl terminating zero), returns -1 on error, 0 if not found, 1 if found
int8_t getStringArg(HttpdConnData *connData, char *name, char *config, int max_len);
// Get the HTTP query-string param 'name' and store it boolean value at 'config',
// supports 1/true and 0/false, returns -1 on error, 0 if not found, 1 if found
int8_t getBoolArg(HttpdConnData *connData, char *name, bool*config);
int cgiMenu(HttpdConnData *connData); int cgiMenu(HttpdConnData *connData);
uint8_t UTILS_StrToIP(const char* str, void *ip); uint8_t UTILS_StrToIP(const char* str, void *ip);
#endif #endif

@ -66,7 +66,7 @@ int ICACHE_FLASH_ATTR cgiMqttSet(HttpdConnData *connData) {
if (connData->conn==NULL) return HTTPD_CGI_DONE; if (connData->conn==NULL) return HTTPD_CGI_DONE;
// handle MQTT server settings // handle MQTT server settings
int mqtt_server = 0; // accumulator for changes/errors int8_t mqtt_server = 0; // accumulator for changes/errors
mqtt_server |= getStringArg(connData, "mqtt-host", mqtt_server |= getStringArg(connData, "mqtt-host",
flashConfig.mqtt_host, sizeof(flashConfig.mqtt_host)); flashConfig.mqtt_host, sizeof(flashConfig.mqtt_host));
if (mqtt_server < 0) return HTTPD_CGI_DONE; if (mqtt_server < 0) return HTTPD_CGI_DONE;
@ -85,7 +85,7 @@ int ICACHE_FLASH_ATTR cgiMqttSet(HttpdConnData *connData) {
&flashConfig.mqtt_clean_session); &flashConfig.mqtt_clean_session);
if (mqtt_server < 0) return HTTPD_CGI_DONE; if (mqtt_server < 0) return HTTPD_CGI_DONE;
mqtt_server |= getBoolArg(connData, "mqtt-enable", int8_t mqtt_en_chg = getBoolArg(connData, "mqtt-enable",
&flashConfig.mqtt_enable); &flashConfig.mqtt_enable);
char buff[16]; char buff[16];
@ -118,6 +118,19 @@ int ICACHE_FLASH_ATTR cgiMqttSet(HttpdConnData *connData) {
if (mqtt_server) { if (mqtt_server) {
#ifdef CGIMQTT_DBG #ifdef CGIMQTT_DBG
os_printf("MQTT server settings changed, enable=%d\n", flashConfig.mqtt_enable); os_printf("MQTT server settings changed, enable=%d\n", flashConfig.mqtt_enable);
#endif
MQTT_Free(&mqttClient); // safe even if not connected
MQTT_Init(&mqttClient, flashConfig.mqtt_host, flashConfig.mqtt_port, 0,
flashConfig.mqtt_timeout, flashConfig.mqtt_clientid,
flashConfig.mqtt_username, flashConfig.mqtt_password,
flashConfig.mqtt_keepalive);
if (flashConfig.mqtt_enable && strlen(flashConfig.mqtt_host) > 0)
MQTT_Connect(&mqttClient);
// if just enable changed we just need to bounce the client
} else if (mqtt_en_chg > 0) {
#ifdef CGIMQTT_DBG
os_printf("MQTT server enable=%d changed\n", flashConfig.mqtt_enable);
#endif #endif
if (flashConfig.mqtt_enable && strlen(flashConfig.mqtt_host) > 0) if (flashConfig.mqtt_enable && strlen(flashConfig.mqtt_host) > 0)
MQTT_Reconnect(&mqttClient); MQTT_Reconnect(&mqttClient);

@ -20,7 +20,7 @@ FlashConfig flashDefault = {
"\0", // api_key "\0", // api_key
0, 0, 0, // slip_enable, mqtt_enable, mqtt_status_enable 0, 0, 0, // slip_enable, mqtt_enable, mqtt_status_enable
2, 1, // mqtt_timeout, mqtt_clean_session 2, 1, // mqtt_timeout, mqtt_clean_session
1833, 600, // mqtt port, mqtt_keepalive 1883, 60, // mqtt port, mqtt_keepalive
"\0", "\0", "\0", "\0", "\0", // mqtt host, client_id, user, password, status-topic "\0", "\0", "\0", "\0", "\0", // mqtt host, client_id, user, password, status-topic
}; };
@ -115,13 +115,14 @@ bool ICACHE_FLASH_ATTR configRestore(void) {
os_memcpy(&flashConfig, &flashDefault, sizeof(FlashConfig)); os_memcpy(&flashConfig, &flashDefault, sizeof(FlashConfig));
char chipIdStr[6]; char chipIdStr[6];
os_sprintf(chipIdStr, "%06x", system_get_chip_id()); os_sprintf(chipIdStr, "%06x", system_get_chip_id());
os_memcpy(&flashConfig.mqtt_clientid, chipIdStr, os_strlen(chipIdStr));
#ifdef CHIP_IN_HOSTNAME #ifdef CHIP_IN_HOSTNAME
char hostname[16]; char hostname[16];
os_strcpy(hostname, "esp-link-"); os_strcpy(hostname, "esp-link-");
os_strcat(hostname, chipIdStr); os_strcat(hostname, chipIdStr);
os_memcpy(&flashConfig.hostname, hostname, os_strlen(hostname)); os_memcpy(&flashConfig.hostname, hostname, os_strlen(hostname));
#endif #endif
os_memcpy(&flashConfig.mqtt_clientid, &flashConfig.hostname, os_strlen(flashConfig.hostname));
os_memcpy(&flashConfig.mqtt_status_topic, &flashConfig.hostname, os_strlen(flashConfig.hostname));
flash_pri = 0; flash_pri = 0;
return false; return false;
} }

@ -12,10 +12,9 @@ static void ICACHE_FLASH_ATTR
mqttTimerCb(void *arg) mqttTimerCb(void *arg)
{ {
if (once++ > 0) return; if (once++ > 0) return;
MQTT_Init(&mqttClient, flashConfig.mqtt_host, flashConfig.mqtt_port, 0, 2, if (flashConfig.mqtt_enable)
flashConfig.mqtt_clientid, flashConfig.mqtt_username, flashConfig.mqtt_password, 60);
MQTT_Connect(&mqttClient); MQTT_Connect(&mqttClient);
MQTT_Subscribe(&mqttClient, "system/time", 0); //MQTT_Subscribe(&mqttClient, "system/time", 0);
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
@ -33,61 +32,9 @@ wifiStateChangeCb(uint8_t status)
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
mqtt_client_init() mqtt_client_init()
{ {
MQTT_Init(&mqttClient, flashConfig.mqtt_host, flashConfig.mqtt_port, 0,
flashConfig.mqtt_timeout, flashConfig.mqtt_clientid,
flashConfig.mqtt_username, flashConfig.mqtt_password,
flashConfig.mqtt_keepalive);
wifiAddStateChangeCb(wifiStateChangeCb); wifiAddStateChangeCb(wifiStateChangeCb);
} }
#if 0
MQTT_Client mqttClient;
void ICACHE_FLASH_ATTR
mqttConnectedCb(uint32_t *args) {
MQTT_Client* client = (MQTT_Client*)args;
MQTT_Publish(client, "announce/all", "Hello World!", 0, 0);
}
void ICACHE_FLASH_ATTR
mqttDisconnectedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
os_printf("MQTT Disconnected\n");
}
void ICACHE_FLASH_ATTR
mqttTcpDisconnectedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
os_printf("MQTT TCP Disconnected\n");
}
void ICACHE_FLASH_ATTR
mqttPublishedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
os_printf("MQTT Published\n");
}
void ICACHE_FLASH_ATTR
mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len) {
char *topicBuf = (char*)os_zalloc(topic_len + 1);
char *dataBuf = (char*)os_zalloc(data_len + 1);
// MQTT_Client* client = (MQTT_Client*)args;
os_memcpy(topicBuf, topic, topic_len);
topicBuf[topic_len] = 0;
os_memcpy(dataBuf, data, data_len);
dataBuf[data_len] = 0;
os_printf("Receive topic: %s, data: %s\n", topicBuf, dataBuf);
os_free(topicBuf);
os_free(dataBuf);
}
MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, MQTT_SECURITY);
MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, MQTT_CLSESSION);
MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0);
MQTT_OnConnected(&mqttClient, mqttConnectedCb);
MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);
MQTT_OnDisconnected(&mqttClient, mqttTcpDisconnectedCb);
MQTT_OnPublished(&mqttClient, mqttPublishedCb);
MQTT_OnData(&mqttClient, mqttDataCb);
#endif

@ -40,18 +40,16 @@
<input type="text" name="mqtt-host"/> <input type="text" name="mqtt-host"/>
<label>Server port</label> <label>Server port</label>
<input type="text" name="mqtt-port"/> <input type="text" name="mqtt-port"/>
<label>Client Timeout</label>
<input type="text" name="mqtt-timeout" />
<label>Client ID</label> <label>Client ID</label>
<input type="text" name="mqtt-client-id"/> <input type="text" name="mqtt-client-id"/>
<label>Client Timeout (seconds)</label>
<input type="text" name="mqtt-timeout" />
<label>Keep Alive Interval (seconds)</label>
<input type="text" name="mqtt-keepalive" />
<label>Username</label> <label>Username</label>
<input type="text" name="mqtt-username"/> <input type="text" name="mqtt-username"/>
<label>Password</label> <label>Password</label>
<input type="password" name="mqtt-password"/> <input type="password" name="mqtt-password"/>
<label>Keep Alive Interval (seconds)</label>
<input type="text" name="mqtt-keepalive" />
<input type="checkbox" name="mqtt-clean-session" />
<label>Clean Session?</label>
</div> </div>
<button id="mqtt-button" type="submit" class="pure-button button-primary"> <button id="mqtt-button" type="submit" class="pure-button button-primary">
Update server settings! Update server settings!

@ -166,6 +166,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) {
// callbacks for internal and external clients // callbacks for internal and external clients
if (client->connectedCb) client->connectedCb((uint32_t*)client); if (client->connectedCb) client->connectedCb((uint32_t*)client);
if (client->cmdConnectedCb) client->cmdConnectedCb((uint32_t*)client); if (client->cmdConnectedCb) client->cmdConnectedCb((uint32_t*)client);
client->reconTimeout = 1; // reset the reconnect backoff
break; break;
case MQTT_MSG_TYPE_SUBACK: case MQTT_MSG_TYPE_SUBACK:
@ -361,7 +362,8 @@ mqtt_tcpclient_discon_cb(void* arg) {
// reconnect unless we're in a permanently disconnected state // reconnect unless we're in a permanently disconnected state
if (client->connState == MQTT_DISCONNECTED) return; if (client->connState == MQTT_DISCONNECTED) return;
client->timeoutTick = 2; client->timeoutTick = client->reconTimeout;
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
client->connState = TCP_RECONNECT_REQ; client->connState = TCP_RECONNECT_REQ;
} }
@ -374,7 +376,7 @@ static void ICACHE_FLASH_ATTR
mqtt_tcpclient_recon_cb(void* arg, int8_t err) { mqtt_tcpclient_recon_cb(void* arg, int8_t err) {
struct espconn* pespconn = (struct espconn *)arg; struct espconn* pespconn = (struct espconn *)arg;
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err); //DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err);
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); if (pespconn->proto.tcp) os_free(pespconn->proto.tcp);
os_free(pespconn); os_free(pespconn);
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port); os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port);
@ -383,8 +385,10 @@ mqtt_tcpclient_recon_cb(void* arg, int8_t err) {
// reconnect unless we're in a permanently disconnected state // reconnect unless we're in a permanently disconnected state
if (client->connState == MQTT_DISCONNECTED) return; if (client->connState == MQTT_DISCONNECTED) return;
client->timeoutTick = 2; client->timeoutTick = client->reconTimeout;
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
client->connState = TCP_RECONNECT_REQ; client->connState = TCP_RECONNECT_REQ;
os_printf("timeoutTick=%d reconTimeout=%d\n", client->timeoutTick, client->reconTimeout);
} }
@ -492,12 +496,13 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) {
MQTT_Client* client = (MQTT_Client *)pConn->reverse; MQTT_Client* client = (MQTT_Client *)pConn->reverse;
if (ipaddr == NULL) { if (ipaddr == NULL) {
os_printf("MQTT DNS: lookup failed\n"); os_printf("MQTT: DNS lookup failed\n");
client->timeoutTick = 10; client->timeoutTick = client->reconTimeout;
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
return; return;
} }
DBG_MQTT("MQTT DNS: found ip %d.%d.%d.%d\n", DBG_MQTT("MQTT: ip %d.%d.%d.%d\n",
*((uint8 *)&ipaddr->addr), *((uint8 *)&ipaddr->addr),
*((uint8 *)&ipaddr->addr + 1), *((uint8 *)&ipaddr->addr + 1),
*((uint8 *)&ipaddr->addr + 2), *((uint8 *)&ipaddr->addr + 2),
@ -512,8 +517,9 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) {
err = espconn_connect(client->pCon); err = espconn_connect(client->pCon);
if (err != 0) { if (err != 0) {
os_printf("MQTT ERROR: Failed to connect\n"); os_printf("MQTT ERROR: Failed to connect\n");
client->timeoutTick = client->reconTimeout;
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
client->connState = TCP_RECONNECT_REQ; client->connState = TCP_RECONNECT_REQ;
client->timeoutTick = 10;
} else { } else {
DBG_MQTT("MQTT: connecting...\n"); DBG_MQTT("MQTT: connecting...\n");
} }
@ -617,58 +623,59 @@ MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) {
* @retval None * @retval None
*/ */
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, MQTT_Init(MQTT_Client* client, char* host, uint32 port, uint8_t security, uint8_t sendTimeout,
char* client_id, char* client_user, char* client_pass, char* client_id, char* client_user, char* client_pass,
uint8_t keepAliveTime) { uint8_t keepAliveTime) {
DBG_MQTT("MQTT_Init\n"); DBG_MQTT("MQTT_Init\n");
os_memset(mqttClient, 0, sizeof(MQTT_Client)); os_memset(client, 0, sizeof(MQTT_Client));
mqttClient->host = (char*)os_zalloc(os_strlen(host) + 1); client->host = (char*)os_zalloc(os_strlen(host) + 1);
os_strcpy(mqttClient->host, host); os_strcpy(client->host, host);
mqttClient->port = port; client->port = port;
mqttClient->security = !!security; client->security = !!security;
// timeouts with sanity checks // timeouts with sanity checks
mqttClient->sendTimeout = sendTimeout == 0 ? 1 : sendTimeout; client->sendTimeout = sendTimeout == 0 ? 1 : sendTimeout;
client->reconTimeout = 1; // reset reconnect back-off
os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t)); os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
mqttClient->connect_info.client_id = (char*)os_zalloc(os_strlen(client_id) + 1); client->connect_info.client_id = (char*)os_zalloc(os_strlen(client_id) + 1);
os_strcpy(mqttClient->connect_info.client_id, client_id); os_strcpy(client->connect_info.client_id, client_id);
mqttClient->connect_info.username = (char*)os_zalloc(os_strlen(client_user) + 1); client->connect_info.username = (char*)os_zalloc(os_strlen(client_user) + 1);
os_strcpy(mqttClient->connect_info.username, client_user); os_strcpy(client->connect_info.username, client_user);
mqttClient->connect_info.password = (char*)os_zalloc(os_strlen(client_pass) + 1); client->connect_info.password = (char*)os_zalloc(os_strlen(client_pass) + 1);
os_strcpy(mqttClient->connect_info.password, client_pass); os_strcpy(client->connect_info.password, client_pass);
mqttClient->connect_info.keepalive = keepAliveTime; client->connect_info.keepalive = keepAliveTime;
mqttClient->connect_info.clean_session = 1; client->connect_info.clean_session = 1;
mqttClient->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE); client->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE);
mqttClient->in_buffer_size = MQTT_MAX_RCV_MESSAGE; client->in_buffer_size = MQTT_MAX_RCV_MESSAGE;
uint8_t *out_buffer = (uint8_t *)os_zalloc(MQTT_MAX_SHORT_MESSAGE); uint8_t *out_buffer = (uint8_t *)os_zalloc(MQTT_MAX_SHORT_MESSAGE);
mqtt_msg_init(&mqttClient->mqtt_connection, out_buffer, MQTT_MAX_SHORT_MESSAGE); mqtt_msg_init(&client->mqtt_connection, out_buffer, MQTT_MAX_SHORT_MESSAGE);
} }
/** /**
* @brief MQTT Set Last Will Topic, must be called before MQTT_Connect * @brief MQTT Set Last Will Topic, must be called before MQTT_Connect
*/ */
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, MQTT_InitLWT(MQTT_Client* client, char* will_topic, char* will_msg,
uint8_t will_qos, uint8_t will_retain) { uint8_t will_qos, uint8_t will_retain) {
mqttClient->connect_info.will_topic = (char*)os_zalloc(os_strlen(will_topic) + 1); client->connect_info.will_topic = (char*)os_zalloc(os_strlen(will_topic) + 1);
os_strcpy((char*)mqttClient->connect_info.will_topic, will_topic); os_strcpy((char*)client->connect_info.will_topic, will_topic);
mqttClient->connect_info.will_message = (char*)os_zalloc(os_strlen(will_msg) + 1); client->connect_info.will_message = (char*)os_zalloc(os_strlen(will_msg) + 1);
os_strcpy((char*)mqttClient->connect_info.will_message, will_msg); os_strcpy((char*)client->connect_info.will_message, will_msg);
mqttClient->connect_info.will_qos = will_qos; client->connect_info.will_qos = will_qos;
mqttClient->connect_info.will_retain = will_retain; client->connect_info.will_retain = will_retain;
// TODO: if we're connected we should disconnect and reconnect to establish the new LWT // TODO: if we're connected we should disconnect and reconnect to establish the new LWT
} }
@ -679,47 +686,47 @@ MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg,
* @retval None * @retval None
*/ */
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_Connect(MQTT_Client* mqttClient) { MQTT_Connect(MQTT_Client* client) {
//MQTT_Disconnect(mqttClient); //MQTT_Disconnect(client);
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); client->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
mqttClient->pCon->type = ESPCONN_TCP; client->pCon->type = ESPCONN_TCP;
mqttClient->pCon->state = ESPCONN_NONE; client->pCon->state = ESPCONN_NONE;
mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp)); client->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
mqttClient->pCon->proto.tcp->local_port = espconn_port(); client->pCon->proto.tcp->local_port = espconn_port();
mqttClient->pCon->proto.tcp->remote_port = mqttClient->port; client->pCon->proto.tcp->remote_port = client->port;
mqttClient->pCon->reverse = mqttClient; client->pCon->reverse = client;
espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb); espconn_regist_connectcb(client->pCon, mqtt_tcpclient_connect_cb);
espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb); espconn_regist_reconcb(client->pCon, mqtt_tcpclient_recon_cb);
// start timer function to tick every second // start timer function to tick every second
os_timer_disarm(&mqttClient->mqttTimer); os_timer_disarm(&client->mqttTimer);
os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient); os_timer_setfn(&client->mqttTimer, (os_timer_func_t *)mqtt_timer, client);
os_timer_arm(&mqttClient->mqttTimer, 1000, 1); os_timer_arm(&client->mqttTimer, 1000, 1);
// initiate the TCP connection or DNS lookup // initiate the TCP connection or DNS lookup
os_printf("MQTT: Connect to %s:%d %p\n", mqttClient->host, mqttClient->port, mqttClient->pCon); os_printf("MQTT: Connect to %s:%d %p\n", client->host, client->port, client->pCon);
if (UTILS_StrToIP((const char *)mqttClient->host, if (UTILS_StrToIP((const char *)client->host,
(void*)&mqttClient->pCon->proto.tcp->remote_ip)) { (void*)&client->pCon->proto.tcp->remote_ip)) {
uint8_t err; uint8_t err;
if (mqttClient->security) if (client->security)
err = espconn_secure_connect(mqttClient->pCon); err = espconn_secure_connect(client->pCon);
else else
err = espconn_connect(mqttClient->pCon); err = espconn_connect(client->pCon);
if (err != 0) { if (err != 0) {
os_printf("MQTT ERROR: Failed to connect\n"); os_printf("MQTT ERROR: Failed to connect\n");
os_free(mqttClient->pCon->proto.tcp); os_free(client->pCon->proto.tcp);
os_free(mqttClient->pCon); os_free(client->pCon);
mqttClient->pCon = NULL; client->pCon = NULL;
return; return;
} }
} else { } else {
espconn_gethostbyname(mqttClient->pCon, (const char *)mqttClient->host, &mqttClient->ip, espconn_gethostbyname(client->pCon, (const char *)client->host, &client->ip,
mqtt_dns_found); mqtt_dns_found);
} }
mqttClient->connState = TCP_CONNECTING; client->connState = TCP_CONNECTING;
mqttClient->timeoutTick = 20; // generous timeout to allow for DNS, etc client->timeoutTick = 20; // generous timeout to allow for DNS, etc
mqttClient->sending = FALSE; client->sending = FALSE;
} }
static void ICACHE_FLASH_ATTR static void ICACHE_FLASH_ATTR
@ -740,50 +747,71 @@ mqtt_doAbort(MQTT_Client* client) {
} }
client->pCon = NULL; // it will be freed in disconnect callback client->pCon = NULL; // it will be freed in disconnect callback
client->connState = TCP_RECONNECT_REQ; client->connState = TCP_RECONNECT_REQ;
client->timeoutTick = 2; // reconnect in a few seconds client->timeoutTick = client->reconTimeout; // reconnect in a few seconds
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_Reconnect(MQTT_Client* mqttClient) { MQTT_Reconnect(MQTT_Client* client) {
DBG_MQTT("MQTT: Reconnect requested\n"); DBG_MQTT("MQTT: Reconnect requested\n");
if (mqttClient->connState == MQTT_DISCONNECTED) if (client->connState == MQTT_DISCONNECTED)
MQTT_Connect(mqttClient); MQTT_Connect(client);
else if (mqttClient->connState == MQTT_CONNECTED) else if (client->connState == MQTT_CONNECTED)
mqtt_doAbort(mqttClient); mqtt_doAbort(client);
// in other cases we're already in the reconnecting process // in other cases we're already in the reconnecting process
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_Disconnect(MQTT_Client* mqttClient) { MQTT_Disconnect(MQTT_Client* client) {
DBG_MQTT("MQTT: Disconnect requested\n"); DBG_MQTT("MQTT: Disconnect requested\n");
os_timer_disarm(&mqttClient->mqttTimer); os_timer_disarm(&client->mqttTimer);
if (mqttClient->connState == MQTT_DISCONNECTED) return; if (client->connState == MQTT_DISCONNECTED) return;
if (mqttClient->connState == TCP_RECONNECT_REQ) { if (client->connState == TCP_RECONNECT_REQ) {
mqttClient->connState = MQTT_DISCONNECTED; client->connState = MQTT_DISCONNECTED;
return; return;
} }
mqtt_doAbort(mqttClient); mqtt_doAbort(client);
//void *out_buffer = mqttClient->mqtt_connection.buffer; //void *out_buffer = client->mqtt_connection.buffer;
//if (out_buffer != NULL) os_free(out_buffer); //if (out_buffer != NULL) os_free(out_buffer);
mqttClient->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect client->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect
}
void ICACHE_FLASH_ATTR
MQTT_Free(MQTT_Client* client) {
DBG_MQTT("MQTT: Free requested\n");
MQTT_Disconnect(client);
if (client->host) os_free(client->host);
client->host = NULL;
if (client->connect_info.client_id) os_free(client->connect_info.client_id);
if (client->connect_info.username) os_free(client->connect_info.username);
if (client->connect_info.password) os_free(client->connect_info.password);
os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
if (client->in_buffer) os_free(client->in_buffer);
client->in_buffer = NULL;
if (client->mqtt_connection.buffer) os_free(client->mqtt_connection.buffer);
os_memset(&client->mqtt_connection, 0, sizeof(client->mqtt_connection));
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb) { MQTT_OnConnected(MQTT_Client* client, MqttCallback connectedCb) {
mqttClient->connectedCb = connectedCb; client->connectedCb = connectedCb;
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb) { MQTT_OnDisconnected(MQTT_Client* client, MqttCallback disconnectedCb) {
mqttClient->disconnectedCb = disconnectedCb; client->disconnectedCb = disconnectedCb;
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb) { MQTT_OnData(MQTT_Client* client, MqttDataCallback dataCb) {
mqttClient->dataCb = dataCb; client->dataCb = dataCb;
} }
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb) { MQTT_OnPublished(MQTT_Client* client, MqttCallback publishedCb) {
mqttClient->publishedCb = publishedCb; client->publishedCb = publishedCb;
} }

@ -77,6 +77,7 @@ typedef struct {
uint8_t keepAliveAckTick; // seconds 'til keep-alive ack is overdue (0=no k-a) uint8_t keepAliveAckTick; // seconds 'til keep-alive ack is overdue (0=no k-a)
uint8_t timeoutTick; // seconds 'til other timeout uint8_t timeoutTick; // seconds 'til other timeout
uint8_t sendTimeout; // value of send timeout setting uint8_t sendTimeout; // value of send timeout setting
uint8_t reconTimeout; // timeout to reconnect (back-off)
// callbacks // callbacks
MqttCallback connectedCb; MqttCallback connectedCb;
MqttCallback cmdConnectedCb; MqttCallback cmdConnectedCb;
@ -96,6 +97,11 @@ void MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port,
char* client_id, char* client_user, char* client_pass, char* client_id, char* client_user, char* client_pass,
uint8_t keepAliveTime); uint8_t keepAliveTime);
// Completely free buffers associated with client data structure
// This does not free the mqttClient struct itself, it just readies the struct so
// it can be freed or MQTT_Init can be called on it again
void MQTT_Free(MQTT_Client* mqttClient);
// Set Last Will Topic on client, must be called before MQTT_InitConnection // Set Last Will Topic on client, must be called before MQTT_InitConnection
void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg,
uint8_t will_qos, uint8_t will_retain); uint8_t will_qos, uint8_t will_retain);

Loading…
Cancel
Save