|
|
|
@ -166,6 +166,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
// callbacks for internal and external clients
|
|
|
|
|
if (client->connectedCb) client->connectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdConnectedCb) client->cmdConnectedCb((uint32_t*)client); |
|
|
|
|
client->reconTimeout = 1; // reset the reconnect backoff
|
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_SUBACK: |
|
|
|
@ -361,7 +362,8 @@ mqtt_tcpclient_discon_cb(void* arg) { |
|
|
|
|
|
|
|
|
|
// reconnect unless we're in a permanently disconnected state
|
|
|
|
|
if (client->connState == MQTT_DISCONNECTED) return; |
|
|
|
|
client->timeoutTick = 2; |
|
|
|
|
client->timeoutTick = client->reconTimeout; |
|
|
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -374,7 +376,7 @@ static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_recon_cb(void* arg, int8_t err) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err); |
|
|
|
|
//DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err);
|
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port); |
|
|
|
@ -383,8 +385,10 @@ mqtt_tcpclient_recon_cb(void* arg, int8_t err) { |
|
|
|
|
|
|
|
|
|
// reconnect unless we're in a permanently disconnected state
|
|
|
|
|
if (client->connState == MQTT_DISCONNECTED) return; |
|
|
|
|
client->timeoutTick = 2; |
|
|
|
|
client->timeoutTick = client->reconTimeout; |
|
|
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
os_printf("timeoutTick=%d reconTimeout=%d\n", client->timeoutTick, client->reconTimeout); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -492,12 +496,13 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pConn->reverse; |
|
|
|
|
|
|
|
|
|
if (ipaddr == NULL) { |
|
|
|
|
os_printf("MQTT DNS: lookup failed\n"); |
|
|
|
|
client->timeoutTick = 10; |
|
|
|
|
os_printf("MQTT: DNS lookup failed\n"); |
|
|
|
|
client->timeoutTick = client->reconTimeout; |
|
|
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
DBG_MQTT("MQTT DNS: found ip %d.%d.%d.%d\n", |
|
|
|
|
DBG_MQTT("MQTT: ip %d.%d.%d.%d\n", |
|
|
|
|
*((uint8 *)&ipaddr->addr), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 1), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 2), |
|
|
|
@ -512,8 +517,9 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
err = espconn_connect(client->pCon); |
|
|
|
|
if (err != 0) { |
|
|
|
|
os_printf("MQTT ERROR: Failed to connect\n"); |
|
|
|
|
client->timeoutTick = client->reconTimeout; |
|
|
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
client->timeoutTick = 10; |
|
|
|
|
} else { |
|
|
|
|
DBG_MQTT("MQTT: connecting...\n"); |
|
|
|
|
} |
|
|
|
@ -617,58 +623,59 @@ MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, |
|
|
|
|
MQTT_Init(MQTT_Client* client, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, |
|
|
|
|
char* client_id, char* client_user, char* client_pass, |
|
|
|
|
uint8_t keepAliveTime) { |
|
|
|
|
DBG_MQTT("MQTT_Init\n"); |
|
|
|
|
|
|
|
|
|
os_memset(mqttClient, 0, sizeof(MQTT_Client)); |
|
|
|
|
os_memset(client, 0, sizeof(MQTT_Client)); |
|
|
|
|
|
|
|
|
|
mqttClient->host = (char*)os_zalloc(os_strlen(host) + 1); |
|
|
|
|
os_strcpy(mqttClient->host, host); |
|
|
|
|
client->host = (char*)os_zalloc(os_strlen(host) + 1); |
|
|
|
|
os_strcpy(client->host, host); |
|
|
|
|
|
|
|
|
|
mqttClient->port = port; |
|
|
|
|
mqttClient->security = !!security; |
|
|
|
|
client->port = port; |
|
|
|
|
client->security = !!security; |
|
|
|
|
|
|
|
|
|
// timeouts with sanity checks
|
|
|
|
|
mqttClient->sendTimeout = sendTimeout == 0 ? 1 : sendTimeout; |
|
|
|
|
client->sendTimeout = sendTimeout == 0 ? 1 : sendTimeout; |
|
|
|
|
client->reconTimeout = 1; // reset reconnect back-off
|
|
|
|
|
|
|
|
|
|
os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t)); |
|
|
|
|
os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.client_id = (char*)os_zalloc(os_strlen(client_id) + 1); |
|
|
|
|
os_strcpy(mqttClient->connect_info.client_id, client_id); |
|
|
|
|
client->connect_info.client_id = (char*)os_zalloc(os_strlen(client_id) + 1); |
|
|
|
|
os_strcpy(client->connect_info.client_id, client_id); |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.username = (char*)os_zalloc(os_strlen(client_user) + 1); |
|
|
|
|
os_strcpy(mqttClient->connect_info.username, client_user); |
|
|
|
|
client->connect_info.username = (char*)os_zalloc(os_strlen(client_user) + 1); |
|
|
|
|
os_strcpy(client->connect_info.username, client_user); |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.password = (char*)os_zalloc(os_strlen(client_pass) + 1); |
|
|
|
|
os_strcpy(mqttClient->connect_info.password, client_pass); |
|
|
|
|
client->connect_info.password = (char*)os_zalloc(os_strlen(client_pass) + 1); |
|
|
|
|
os_strcpy(client->connect_info.password, client_pass); |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.keepalive = keepAliveTime; |
|
|
|
|
mqttClient->connect_info.clean_session = 1; |
|
|
|
|
client->connect_info.keepalive = keepAliveTime; |
|
|
|
|
client->connect_info.clean_session = 1; |
|
|
|
|
|
|
|
|
|
mqttClient->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE); |
|
|
|
|
mqttClient->in_buffer_size = MQTT_MAX_RCV_MESSAGE; |
|
|
|
|
client->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE); |
|
|
|
|
client->in_buffer_size = MQTT_MAX_RCV_MESSAGE; |
|
|
|
|
|
|
|
|
|
uint8_t *out_buffer = (uint8_t *)os_zalloc(MQTT_MAX_SHORT_MESSAGE); |
|
|
|
|
mqtt_msg_init(&mqttClient->mqtt_connection, out_buffer, MQTT_MAX_SHORT_MESSAGE); |
|
|
|
|
mqtt_msg_init(&client->mqtt_connection, out_buffer, MQTT_MAX_SHORT_MESSAGE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief MQTT Set Last Will Topic, must be called before MQTT_Connect |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, |
|
|
|
|
MQTT_InitLWT(MQTT_Client* client, char* will_topic, char* will_msg, |
|
|
|
|
uint8_t will_qos, uint8_t will_retain) { |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.will_topic = (char*)os_zalloc(os_strlen(will_topic) + 1); |
|
|
|
|
os_strcpy((char*)mqttClient->connect_info.will_topic, will_topic); |
|
|
|
|
client->connect_info.will_topic = (char*)os_zalloc(os_strlen(will_topic) + 1); |
|
|
|
|
os_strcpy((char*)client->connect_info.will_topic, will_topic); |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.will_message = (char*)os_zalloc(os_strlen(will_msg) + 1); |
|
|
|
|
os_strcpy((char*)mqttClient->connect_info.will_message, will_msg); |
|
|
|
|
client->connect_info.will_message = (char*)os_zalloc(os_strlen(will_msg) + 1); |
|
|
|
|
os_strcpy((char*)client->connect_info.will_message, will_msg); |
|
|
|
|
|
|
|
|
|
mqttClient->connect_info.will_qos = will_qos; |
|
|
|
|
mqttClient->connect_info.will_retain = will_retain; |
|
|
|
|
client->connect_info.will_qos = will_qos; |
|
|
|
|
client->connect_info.will_retain = will_retain; |
|
|
|
|
|
|
|
|
|
// TODO: if we're connected we should disconnect and reconnect to establish the new LWT
|
|
|
|
|
} |
|
|
|
@ -679,47 +686,47 @@ MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
//MQTT_Disconnect(mqttClient);
|
|
|
|
|
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); |
|
|
|
|
mqttClient->pCon->type = ESPCONN_TCP; |
|
|
|
|
mqttClient->pCon->state = ESPCONN_NONE; |
|
|
|
|
mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp)); |
|
|
|
|
mqttClient->pCon->proto.tcp->local_port = espconn_port(); |
|
|
|
|
mqttClient->pCon->proto.tcp->remote_port = mqttClient->port; |
|
|
|
|
mqttClient->pCon->reverse = mqttClient; |
|
|
|
|
espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb); |
|
|
|
|
espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb); |
|
|
|
|
MQTT_Connect(MQTT_Client* client) { |
|
|
|
|
//MQTT_Disconnect(client);
|
|
|
|
|
client->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); |
|
|
|
|
client->pCon->type = ESPCONN_TCP; |
|
|
|
|
client->pCon->state = ESPCONN_NONE; |
|
|
|
|
client->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp)); |
|
|
|
|
client->pCon->proto.tcp->local_port = espconn_port(); |
|
|
|
|
client->pCon->proto.tcp->remote_port = client->port; |
|
|
|
|
client->pCon->reverse = client; |
|
|
|
|
espconn_regist_connectcb(client->pCon, mqtt_tcpclient_connect_cb); |
|
|
|
|
espconn_regist_reconcb(client->pCon, mqtt_tcpclient_recon_cb); |
|
|
|
|
|
|
|
|
|
// start timer function to tick every second
|
|
|
|
|
os_timer_disarm(&mqttClient->mqttTimer); |
|
|
|
|
os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient); |
|
|
|
|
os_timer_arm(&mqttClient->mqttTimer, 1000, 1); |
|
|
|
|
os_timer_disarm(&client->mqttTimer); |
|
|
|
|
os_timer_setfn(&client->mqttTimer, (os_timer_func_t *)mqtt_timer, client); |
|
|
|
|
os_timer_arm(&client->mqttTimer, 1000, 1); |
|
|
|
|
|
|
|
|
|
// initiate the TCP connection or DNS lookup
|
|
|
|
|
os_printf("MQTT: Connect to %s:%d %p\n", mqttClient->host, mqttClient->port, mqttClient->pCon); |
|
|
|
|
if (UTILS_StrToIP((const char *)mqttClient->host, |
|
|
|
|
(void*)&mqttClient->pCon->proto.tcp->remote_ip)) { |
|
|
|
|
os_printf("MQTT: Connect to %s:%d %p\n", client->host, client->port, client->pCon); |
|
|
|
|
if (UTILS_StrToIP((const char *)client->host, |
|
|
|
|
(void*)&client->pCon->proto.tcp->remote_ip)) { |
|
|
|
|
uint8_t err; |
|
|
|
|
if (mqttClient->security) |
|
|
|
|
err = espconn_secure_connect(mqttClient->pCon); |
|
|
|
|
if (client->security) |
|
|
|
|
err = espconn_secure_connect(client->pCon); |
|
|
|
|
else |
|
|
|
|
err = espconn_connect(mqttClient->pCon); |
|
|
|
|
err = espconn_connect(client->pCon); |
|
|
|
|
if (err != 0) { |
|
|
|
|
os_printf("MQTT ERROR: Failed to connect\n"); |
|
|
|
|
os_free(mqttClient->pCon->proto.tcp); |
|
|
|
|
os_free(mqttClient->pCon); |
|
|
|
|
mqttClient->pCon = NULL; |
|
|
|
|
os_free(client->pCon->proto.tcp); |
|
|
|
|
os_free(client->pCon); |
|
|
|
|
client->pCon = NULL; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
espconn_gethostbyname(mqttClient->pCon, (const char *)mqttClient->host, &mqttClient->ip, |
|
|
|
|
espconn_gethostbyname(client->pCon, (const char *)client->host, &client->ip, |
|
|
|
|
mqtt_dns_found); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
mqttClient->connState = TCP_CONNECTING; |
|
|
|
|
mqttClient->timeoutTick = 20; // generous timeout to allow for DNS, etc
|
|
|
|
|
mqttClient->sending = FALSE; |
|
|
|
|
client->connState = TCP_CONNECTING; |
|
|
|
|
client->timeoutTick = 20; // generous timeout to allow for DNS, etc
|
|
|
|
|
client->sending = FALSE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
@ -740,50 +747,71 @@ mqtt_doAbort(MQTT_Client* client) { |
|
|
|
|
} |
|
|
|
|
client->pCon = NULL; // it will be freed in disconnect callback
|
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
client->timeoutTick = 2; // reconnect in a few seconds
|
|
|
|
|
client->timeoutTick = client->reconTimeout; // reconnect in a few seconds
|
|
|
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Reconnect(MQTT_Client* mqttClient) { |
|
|
|
|
MQTT_Reconnect(MQTT_Client* client) { |
|
|
|
|
DBG_MQTT("MQTT: Reconnect requested\n"); |
|
|
|
|
if (mqttClient->connState == MQTT_DISCONNECTED) |
|
|
|
|
MQTT_Connect(mqttClient); |
|
|
|
|
else if (mqttClient->connState == MQTT_CONNECTED) |
|
|
|
|
mqtt_doAbort(mqttClient); |
|
|
|
|
if (client->connState == MQTT_DISCONNECTED) |
|
|
|
|
MQTT_Connect(client); |
|
|
|
|
else if (client->connState == MQTT_CONNECTED) |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
// in other cases we're already in the reconnecting process
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Disconnect(MQTT_Client* mqttClient) { |
|
|
|
|
MQTT_Disconnect(MQTT_Client* client) { |
|
|
|
|
DBG_MQTT("MQTT: Disconnect requested\n"); |
|
|
|
|
os_timer_disarm(&mqttClient->mqttTimer); |
|
|
|
|
if (mqttClient->connState == MQTT_DISCONNECTED) return; |
|
|
|
|
if (mqttClient->connState == TCP_RECONNECT_REQ) { |
|
|
|
|
mqttClient->connState = MQTT_DISCONNECTED; |
|
|
|
|
os_timer_disarm(&client->mqttTimer); |
|
|
|
|
if (client->connState == MQTT_DISCONNECTED) return; |
|
|
|
|
if (client->connState == TCP_RECONNECT_REQ) { |
|
|
|
|
client->connState = MQTT_DISCONNECTED; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
mqtt_doAbort(mqttClient); |
|
|
|
|
//void *out_buffer = mqttClient->mqtt_connection.buffer;
|
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
//void *out_buffer = client->mqtt_connection.buffer;
|
|
|
|
|
//if (out_buffer != NULL) os_free(out_buffer);
|
|
|
|
|
mqttClient->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect
|
|
|
|
|
client->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Free(MQTT_Client* client) { |
|
|
|
|
DBG_MQTT("MQTT: Free requested\n"); |
|
|
|
|
MQTT_Disconnect(client); |
|
|
|
|
|
|
|
|
|
if (client->host) os_free(client->host); |
|
|
|
|
client->host = NULL; |
|
|
|
|
|
|
|
|
|
if (client->connect_info.client_id) os_free(client->connect_info.client_id); |
|
|
|
|
if (client->connect_info.username) os_free(client->connect_info.username); |
|
|
|
|
if (client->connect_info.password) os_free(client->connect_info.password); |
|
|
|
|
os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); |
|
|
|
|
|
|
|
|
|
if (client->in_buffer) os_free(client->in_buffer); |
|
|
|
|
client->in_buffer = NULL; |
|
|
|
|
|
|
|
|
|
if (client->mqtt_connection.buffer) os_free(client->mqtt_connection.buffer); |
|
|
|
|
os_memset(&client->mqtt_connection, 0, sizeof(client->mqtt_connection)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb) { |
|
|
|
|
mqttClient->connectedCb = connectedCb; |
|
|
|
|
MQTT_OnConnected(MQTT_Client* client, MqttCallback connectedCb) { |
|
|
|
|
client->connectedCb = connectedCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb) { |
|
|
|
|
mqttClient->disconnectedCb = disconnectedCb; |
|
|
|
|
MQTT_OnDisconnected(MQTT_Client* client, MqttCallback disconnectedCb) { |
|
|
|
|
client->disconnectedCb = disconnectedCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb) { |
|
|
|
|
mqttClient->dataCb = dataCb; |
|
|
|
|
MQTT_OnData(MQTT_Client* client, MqttDataCallback dataCb) { |
|
|
|
|
client->dataCb = dataCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb) { |
|
|
|
|
mqttClient->publishedCb = publishedCb; |
|
|
|
|
MQTT_OnPublished(MQTT_Client* client, MqttCallback publishedCb) { |
|
|
|
|
client->publishedCb = publishedCb; |
|
|
|
|
} |
|
|
|
|