|
|
|
@ -33,41 +33,38 @@ |
|
|
|
|
|
|
|
|
|
#define MQTT_TASK_PRIO 0 |
|
|
|
|
#define MQTT_TASK_QUEUE_SIZE 1 |
|
|
|
|
#define MQTT_SEND_TIMOUT 5 |
|
|
|
|
#define MQTT_SEND_TIMOUT 5 |
|
|
|
|
|
|
|
|
|
#ifndef QUEUE_BUFFER_SIZE |
|
|
|
|
#define QUEUE_BUFFER_SIZE 2048 |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
unsigned char *default_certificate; |
|
|
|
|
unsigned char* default_certificate; |
|
|
|
|
unsigned int default_certificate_len = 0; |
|
|
|
|
unsigned char *default_private_key; |
|
|
|
|
unsigned char* default_private_key; |
|
|
|
|
unsigned int default_private_key_len = 0; |
|
|
|
|
|
|
|
|
|
os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]; |
|
|
|
|
|
|
|
|
|
LOCAL void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) |
|
|
|
|
{ |
|
|
|
|
struct espconn *pConn = (struct espconn *)arg; |
|
|
|
|
mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
struct espconn* pConn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pConn->reverse; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (ipaddr == NULL) |
|
|
|
|
{ |
|
|
|
|
if (ipaddr == NULL) { |
|
|
|
|
os_printf("DNS: Found, but got no ip, try to reconnect\n"); |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
os_printf("DNS: found ip %d.%d.%d.%d\n", |
|
|
|
|
*((uint8 *)&ipaddr->addr), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 1), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 2), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 3)); |
|
|
|
|
*((uint8 *)&ipaddr->addr), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 1), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 2), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 3)); |
|
|
|
|
|
|
|
|
|
if (client->ip.addr == 0 && ipaddr->addr != 0) |
|
|
|
|
{ |
|
|
|
|
if (client->ip.addr == 0 && ipaddr->addr != 0) { |
|
|
|
|
os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); |
|
|
|
|
#ifdef CLIENT_SSL_ENABLE |
|
|
|
|
if (client->security){ |
|
|
|
@ -75,7 +72,7 @@ mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
#endif |
|
|
|
|
espconn_connect(client->pCon); |
|
|
|
|
espconn_connect(client->pCon); |
|
|
|
|
|
|
|
|
|
client->connState = TCP_CONNECTING; |
|
|
|
|
os_printf("MQTT-TCP: connecting...\n"); |
|
|
|
@ -85,10 +82,8 @@ mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOCAL void ICACHE_FLASH_ATTR |
|
|
|
|
deliver_publish(MQTT_Client* client, uint8_t* message, int length) |
|
|
|
|
{ |
|
|
|
|
deliver_publish(MQTT_Client* client, uint8_t* message, uint16_t length) { |
|
|
|
|
mqtt_event_data_t event_data; |
|
|
|
|
|
|
|
|
|
event_data.topic_length = length; |
|
|
|
@ -99,8 +94,10 @@ deliver_publish(MQTT_Client* client, uint8_t* message, int length) |
|
|
|
|
if (client->dataCb) |
|
|
|
|
client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
if (client->cmdDataCb) |
|
|
|
|
client->cmdDataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief Client received callback function. |
|
|
|
@ -110,26 +107,25 @@ deliver_publish(MQTT_Client* client, uint8_t* message, int length) |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len) |
|
|
|
|
{ |
|
|
|
|
mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
uint8_t msg_type; |
|
|
|
|
uint8_t msg_qos; |
|
|
|
|
uint16_t msg_id; |
|
|
|
|
|
|
|
|
|
struct espconn *pCon = (struct espconn*)arg; |
|
|
|
|
MQTT_Client *client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
struct espconn* pCon = (struct espconn*)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
|
|
|
|
|
READPACKET: |
|
|
|
|
os_printf("MQTT-TCP: Data received %d bytes\n", len); |
|
|
|
|
if (len < MQTT_BUF_SIZE && len > 0){ |
|
|
|
|
if (len < MQTT_BUF_SIZE && len > 0) { |
|
|
|
|
os_memcpy(client->mqtt_state.in_buffer, pdata, len); |
|
|
|
|
|
|
|
|
|
msg_type = mqtt_get_type(client->mqtt_state.in_buffer); |
|
|
|
|
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); |
|
|
|
|
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); |
|
|
|
|
if (client->connState == MQTT_CONNECT_SENDING) { |
|
|
|
|
if (msg_type == MQTT_MSG_TYPE_CONNACK){ |
|
|
|
|
if (client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){ |
|
|
|
|
if (msg_type == MQTT_MSG_TYPE_CONNACK) { |
|
|
|
|
if (client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT) { |
|
|
|
|
os_printf("MQTT: Invalid packet\n"); |
|
|
|
|
#ifdef CLIENT_SSL_ENABLE |
|
|
|
|
if (client->security){ |
|
|
|
@ -137,13 +133,15 @@ READPACKET: |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
#endif |
|
|
|
|
espconn_disconnect(client->pCon); |
|
|
|
|
espconn_disconnect(client->pCon); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
os_printf("MQTT: Connected to %s:%ld\n", client->host, client->port); |
|
|
|
|
client->connState = MQTT_DATA; |
|
|
|
|
if (client->connectedCb) |
|
|
|
|
client->connectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdConnectedCb) |
|
|
|
|
client->cmdConnectedCb((uint32_t*)client); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -154,50 +152,50 @@ READPACKET: |
|
|
|
|
|
|
|
|
|
if (msg_type == MQTT_MSG_TYPE_SUBACK) { |
|
|
|
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) |
|
|
|
|
os_printf("MQTT: Subscribe successful\n"); |
|
|
|
|
os_printf("MQTT: Subscribe successful\n"); |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_UNSUBACK){ |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_UNSUBACK) { |
|
|
|
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) |
|
|
|
|
os_printf("MQTT: UnSubscribe successful\n"); |
|
|
|
|
os_printf("MQTT: UnSubscribe successful\n"); |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_PUBLISH) { |
|
|
|
|
if (msg_qos == 1) |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); |
|
|
|
|
else if (msg_qos == 2) |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); |
|
|
|
|
if (msg_qos == 1 || msg_qos == 2){ |
|
|
|
|
if (msg_qos == 1 || msg_qos == 2) { |
|
|
|
|
os_printf("MQTT: Queue response QoS: %d\n", msg_qos); |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { |
|
|
|
|
os_printf("MQTT: Queue full\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_PUBACK) { |
|
|
|
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){ |
|
|
|
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { |
|
|
|
|
os_printf("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_PUBREC) { |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { |
|
|
|
|
os_printf("MQTT: Queue full\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_PUBREL) { |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { |
|
|
|
|
os_printf("MQTT: Queue full\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_PUBCOMP) { |
|
|
|
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){ |
|
|
|
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { |
|
|
|
|
os_printf("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (msg_type == MQTT_MSG_TYPE_PINGREQ) { |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection); |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ |
|
|
|
|
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { |
|
|
|
|
os_printf("MQTT: Queue full\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -205,12 +203,10 @@ READPACKET: |
|
|
|
|
// NOTE: this is done down here and not in the switch case above
|
|
|
|
|
// because the PSOCK_READBUF_LEN() won't work inside a switch
|
|
|
|
|
// statement due to the way protothreads resume.
|
|
|
|
|
if (msg_type == MQTT_MSG_TYPE_PUBLISH) |
|
|
|
|
{ |
|
|
|
|
if (msg_type == MQTT_MSG_TYPE_PUBLISH) { |
|
|
|
|
len = client->mqtt_state.message_length_read; |
|
|
|
|
|
|
|
|
|
if (client->mqtt_state.message_length < client->mqtt_state.message_length_read) |
|
|
|
|
{ |
|
|
|
|
if (client->mqtt_state.message_length < client->mqtt_state.message_length_read) { |
|
|
|
|
//client->connState = MQTT_PUBLISH_RECV;
|
|
|
|
|
//Not Implement yet
|
|
|
|
|
len -= client->mqtt_state.message_length; |
|
|
|
@ -234,26 +230,27 @@ READPACKET: |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_sent_cb(void *arg) |
|
|
|
|
{ |
|
|
|
|
struct espconn *pCon = (struct espconn *)arg; |
|
|
|
|
mqtt_tcpclient_sent_cb(void* arg) { |
|
|
|
|
struct espconn* pCon = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
os_printf("MQTT-TCP: Sent\n"); |
|
|
|
|
client->sendTimeout = 0; |
|
|
|
|
if (client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH){ |
|
|
|
|
if (client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) { |
|
|
|
|
if (client->publishedCb) |
|
|
|
|
client->publishedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdPublishedCb) |
|
|
|
|
client->cmdPublishedCb((uint32_t*)client); |
|
|
|
|
} |
|
|
|
|
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR mqtt_timer(void *arg) |
|
|
|
|
{ |
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
|
|
|
mqtt_timer(void* arg) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client*)arg; |
|
|
|
|
|
|
|
|
|
if (client->connState == MQTT_DATA){ |
|
|
|
|
if (client->connState == MQTT_DATA) { |
|
|
|
|
client->keepAliveTick++; |
|
|
|
|
if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive){ |
|
|
|
|
if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive) { |
|
|
|
|
|
|
|
|
|
os_printf("\nMQTT: Send keepalive packet to %s:%ld!\n", client->host, client->port); |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); |
|
|
|
@ -270,7 +267,7 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg) |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
#endif |
|
|
|
|
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); |
|
|
|
|
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); |
|
|
|
|
|
|
|
|
|
client->mqtt_state.outbound_message = NULL; |
|
|
|
|
|
|
|
|
@ -279,11 +276,15 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
else if (client->connState == TCP_RECONNECT_REQ){ |
|
|
|
|
else if (client->connState == TCP_RECONNECT_REQ) { |
|
|
|
|
client->reconnectTick++; |
|
|
|
|
if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) { |
|
|
|
|
client->reconnectTick = 0; |
|
|
|
|
client->connState = TCP_RECONNECT; |
|
|
|
|
if (client->tcpDisconnectedCb) |
|
|
|
|
client->tcpDisconnectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdTcpDisconnectedCb) |
|
|
|
|
client->cmdTcpDisconnectedCb((uint32_t*)client); |
|
|
|
|
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -292,15 +293,16 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_discon_cb(void *arg) |
|
|
|
|
{ |
|
|
|
|
mqtt_tcpclient_discon_cb(void* arg) { |
|
|
|
|
|
|
|
|
|
struct espconn *pespconn = (struct espconn *)arg; |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
os_printf("MQTT-TCP: Disconnected callback\n"); |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
if (client->disconnectedCb) |
|
|
|
|
client->disconnectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdDisconnectedCb) |
|
|
|
|
client->cmdDisconnectedCb((uint32_t*)client); |
|
|
|
|
|
|
|
|
|
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); |
|
|
|
|
} |
|
|
|
@ -311,9 +313,8 @@ mqtt_tcpclient_discon_cb(void *arg) |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_connect_cb(void *arg) |
|
|
|
|
{ |
|
|
|
|
struct espconn *pCon = (struct espconn *)arg; |
|
|
|
|
mqtt_tcpclient_connect_cb(void* arg) { |
|
|
|
|
struct espconn* pCon = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
|
|
|
|
|
espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); |
|
|
|
@ -335,7 +336,7 @@ mqtt_tcpclient_connect_cb(void *arg) |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
#endif |
|
|
|
|
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); |
|
|
|
|
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); |
|
|
|
|
|
|
|
|
|
client->mqtt_state.outbound_message = NULL; |
|
|
|
|
client->connState = MQTT_CONNECT_SENDING; |
|
|
|
@ -348,9 +349,8 @@ mqtt_tcpclient_connect_cb(void *arg) |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_recon_cb(void *arg, sint8 errType) |
|
|
|
|
{ |
|
|
|
|
struct espconn *pCon = (struct espconn *)arg; |
|
|
|
|
mqtt_tcpclient_recon_cb(void* arg, int8_t errType) { |
|
|
|
|
struct espconn* pCon = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
|
|
|
|
|
os_printf("MQTT-TCP: Reconnect to %s:%ld\n", client->host, client->port); |
|
|
|
@ -371,21 +371,20 @@ mqtt_tcpclient_recon_cb(void *arg, sint8 errType) |
|
|
|
|
* @retval TRUE if success queue |
|
|
|
|
*/ |
|
|
|
|
bool ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int qos, int retain) |
|
|
|
|
{ |
|
|
|
|
MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain) { |
|
|
|
|
int data_length = os_strlen(data); |
|
|
|
|
uint8_t dataBuffer[MQTT_BUF_SIZE]; |
|
|
|
|
uint16_t dataLen; |
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, |
|
|
|
|
topic, data, data_length, |
|
|
|
|
qos, retain, |
|
|
|
|
&client->mqtt_state.pending_msg_id); |
|
|
|
|
if (client->mqtt_state.outbound_message->length == 0){ |
|
|
|
|
topic, data, data_length, |
|
|
|
|
qos, retain, |
|
|
|
|
&client->mqtt_state.pending_msg_id); |
|
|
|
|
if (client->mqtt_state.outbound_message->length == 0) { |
|
|
|
|
os_printf("MQTT: Queuing Publish failed\n"); |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
|
os_printf("MQTT: Queuing Publish, length: %d, queue size(%ld/%ld)\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size); |
|
|
|
|
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ |
|
|
|
|
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { |
|
|
|
|
os_printf("MQTT: Queue full\n"); |
|
|
|
|
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { |
|
|
|
|
os_printf("MQTT: Serious buffer error\n"); |
|
|
|
@ -404,16 +403,15 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int qos, |
|
|
|
|
* @retval TRUE if success queue |
|
|
|
|
*/ |
|
|
|
|
bool ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos) |
|
|
|
|
{ |
|
|
|
|
MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { |
|
|
|
|
uint8_t dataBuffer[MQTT_BUF_SIZE]; |
|
|
|
|
uint16_t dataLen; |
|
|
|
|
|
|
|
|
|
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, |
|
|
|
|
topic, 0, |
|
|
|
|
&client->mqtt_state.pending_msg_id); |
|
|
|
|
topic, 0, |
|
|
|
|
&client->mqtt_state.pending_msg_id); |
|
|
|
|
os_printf("MQTT: Queue Subscribe, topic: \"%s\", id: %d\n", topic, client->mqtt_state.pending_msg_id); |
|
|
|
|
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ |
|
|
|
|
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { |
|
|
|
|
os_printf("MQTT: Queue full\n"); |
|
|
|
|
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { |
|
|
|
|
os_printf("MQTT: Serious buffer error\n"); |
|
|
|
@ -424,7 +422,8 @@ MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos) |
|
|
|
|
return TRUE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e) { |
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
|
|
|
MQTT_Task(os_event_t* e) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client*)e->par; |
|
|
|
|
uint8_t dataBuffer[MQTT_BUF_SIZE]; |
|
|
|
|
uint16_t dataLen; |
|
|
|
@ -434,7 +433,7 @@ void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e) { |
|
|
|
|
if (client->connState == TCP_RECONNECT_REQ) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
else if (client->connState == TCP_RECONNECT){ |
|
|
|
|
else if (client->connState == TCP_RECONNECT) { |
|
|
|
|
MQTT_Connect(client); |
|
|
|
|
os_printf("MQTT-TCP: Reconnect to: %s:%ld\n", client->host, client->port); |
|
|
|
|
client->connState = TCP_CONNECTING; |
|
|
|
@ -443,7 +442,7 @@ void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e) { |
|
|
|
|
if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0){ |
|
|
|
|
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) { |
|
|
|
|
client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer); |
|
|
|
|
client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen); |
|
|
|
|
client->sendTimeout = MQTT_SEND_TIMOUT; |
|
|
|
@ -454,7 +453,7 @@ void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e) { |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
#endif |
|
|
|
|
espconn_sent(client->pCon, dataBuffer, dataLen); |
|
|
|
|
espconn_sent(client->pCon, dataBuffer, dataLen); |
|
|
|
|
|
|
|
|
|
client->mqtt_state.outbound_message = NULL; |
|
|
|
|
return; |
|
|
|
@ -472,8 +471,7 @@ void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e) { |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t security) |
|
|
|
|
{ |
|
|
|
|
MQTT_InitConnection(MQTT_Client* mqttClient, uint8_t* host, uint32 port, uint8_t security) { |
|
|
|
|
uint32_t temp; |
|
|
|
|
os_printf("MQTT_InitConnection\n"); |
|
|
|
|
os_memset(mqttClient, 0, sizeof(MQTT_Client)); |
|
|
|
@ -495,8 +493,7 @@ MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession) |
|
|
|
|
{ |
|
|
|
|
MQTT_InitClient(MQTT_Client* mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint8_t keepAliveTime, uint8_t cleanSession) { |
|
|
|
|
uint32_t temp; |
|
|
|
|
os_printf("MQTT_InitClient\n"); |
|
|
|
|
|
|
|
|
@ -536,8 +533,7 @@ MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_use |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain) |
|
|
|
|
{ |
|
|
|
|
MQTT_InitLWT(MQTT_Client* mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain) { |
|
|
|
|
uint32_t temp; |
|
|
|
|
temp = os_strlen((char*)will_topic); |
|
|
|
|
mqttClient->connect_info.will_topic = (char*)os_zalloc(temp + 1); |
|
|
|
@ -560,8 +556,7 @@ MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, ui |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Connect(MQTT_Client *mqttClient) |
|
|
|
|
{ |
|
|
|
|
MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
MQTT_Disconnect(mqttClient); |
|
|
|
|
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); |
|
|
|
|
mqttClient->pCon->type = ESPCONN_TCP; |
|
|
|
@ -589,7 +584,7 @@ MQTT_Connect(MQTT_Client *mqttClient) |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
#endif |
|
|
|
|
espconn_connect(mqttClient->pCon); |
|
|
|
|
espconn_connect(mqttClient->pCon); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
os_printf("MQTT-TCP: Connect to domain %s:%ld\n", mqttClient->host, mqttClient->port); |
|
|
|
@ -599,12 +594,11 @@ MQTT_Connect(MQTT_Client *mqttClient) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Disconnect(MQTT_Client *mqttClient) |
|
|
|
|
{ |
|
|
|
|
if (mqttClient->pCon){ |
|
|
|
|
MQTT_Disconnect(MQTT_Client* mqttClient) { |
|
|
|
|
if (mqttClient->pCon) { |
|
|
|
|
os_printf("Free memory\n"); |
|
|
|
|
if (mqttClient->pCon->proto.tcp) |
|
|
|
|
os_free(mqttClient->pCon->proto.tcp); |
|
|
|
|
os_free(mqttClient->pCon->proto.tcp); |
|
|
|
|
os_free(mqttClient->pCon); |
|
|
|
|
mqttClient->pCon = NULL; |
|
|
|
|
} |
|
|
|
@ -613,25 +607,27 @@ MQTT_Disconnect(MQTT_Client *mqttClient) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb) |
|
|
|
|
{ |
|
|
|
|
MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb) { |
|
|
|
|
mqttClient->connectedCb = connectedCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb) |
|
|
|
|
{ |
|
|
|
|
MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb) { |
|
|
|
|
mqttClient->disconnectedCb = disconnectedCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb) |
|
|
|
|
MQTT_OnTcpDisconnected(MQTT_Client *mqttClient, MqttCallback tcpDisconnectedCb) |
|
|
|
|
{ |
|
|
|
|
mqttClient->tcpDisconnectedCb = tcpDisconnectedCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb) { |
|
|
|
|
mqttClient->dataCb = dataCb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb) |
|
|
|
|
{ |
|
|
|
|
MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb) { |
|
|
|
|
mqttClient->publishedCb = publishedCb; |
|
|
|
|
} |
|
|
|
|