|
|
|
@ -42,9 +42,9 @@ |
|
|
|
|
#include "mqtt.h" |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
#define DBG(format, ...) os_printf(format, ## __VA_ARGS__) |
|
|
|
|
#define DBG_MQTT(format, ...) os_printf(format, ## __VA_ARGS__) |
|
|
|
|
#else |
|
|
|
|
#define DBG(format, ...) do { } while(0) |
|
|
|
|
#define DBG_MQTT(format, ...) do { } while(0) |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
extern void dumpMem(void *buf, int len); |
|
|
|
@ -145,7 +145,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
if (client->connState != MQTT_CONNECTED) { |
|
|
|
|
// why are we receiving something??
|
|
|
|
|
DBG("MQTT ERROR: recv in invalid state %d\n", client->connState); |
|
|
|
|
DBG_MQTT("MQTT ERROR: recv in invalid state %d\n", client->connState); |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -157,12 +157,12 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
pending_msg_type = mqtt_get_type(client->pending_buffer->data); |
|
|
|
|
pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled); |
|
|
|
|
} |
|
|
|
|
DBG("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n", |
|
|
|
|
DBG_MQTT("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n", |
|
|
|
|
mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type],pending_msg_id); |
|
|
|
|
|
|
|
|
|
switch (msg_type) { |
|
|
|
|
case MQTT_MSG_TYPE_CONNACK: |
|
|
|
|
//DBG("MQTT: Connect successful\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Connect successful\n");
|
|
|
|
|
// callbacks for internal and external clients
|
|
|
|
|
if (client->connectedCb) client->connectedCb(client); |
|
|
|
|
if (client->cmdConnectedCb) client->cmdConnectedCb(client); |
|
|
|
@ -171,28 +171,28 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_SUBACK: |
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && pending_msg_id == msg_id) { |
|
|
|
|
//DBG("MQTT: Subscribe successful\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Subscribe successful\n");
|
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_UNSUBACK: |
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && pending_msg_id == msg_id) { |
|
|
|
|
//DBG("MQTT: Unsubscribe successful\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Unsubscribe successful\n");
|
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBACK: // ack for a publish we sent
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) { |
|
|
|
|
//DBG("MQTT: QoS1 Publish successful\n");
|
|
|
|
|
//DBG_MQTT("MQTT: QoS1 Publish successful\n");
|
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBREC: // rec for a publish we sent
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) { |
|
|
|
|
//DBG("MQTT: QoS2 publish cont\n");
|
|
|
|
|
//DBG_MQTT("MQTT: QoS2 publish cont\n");
|
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
// we need to send PUBREL
|
|
|
|
|
mqtt_msg_pubrel(&client->mqtt_connection, msg_id); |
|
|
|
@ -203,7 +203,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBCOMP: // comp for a pubrel we sent (originally publish we sent)
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg_id == msg_id) { |
|
|
|
|
//DBG("MQTT: QoS2 Publish successful\n");
|
|
|
|
|
//DBG_MQTT("MQTT: QoS2 Publish successful\n");
|
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
@ -229,7 +229,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBREL: // rel for a rec we sent (originally publish received)
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg_id == msg_id) { |
|
|
|
|
//DBG("MQTT: Cont QoS2 recv\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Cont QoS2 recv\n");
|
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
// we need to send PUBCOMP
|
|
|
|
|
mqtt_msg_pubcomp(&client->mqtt_connection, msg_id); |
|
|
|
@ -262,16 +262,16 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
*/ |
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_sent_cb(void* arg) { |
|
|
|
|
//DBG("MQTT: sent CB\n");
|
|
|
|
|
//DBG_MQTT("MQTT: sent CB\n");
|
|
|
|
|
struct espconn* pCon = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
if (client == NULL) return; // aborted connection ?
|
|
|
|
|
//DBG("MQTT: Sent\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Sent\n");
|
|
|
|
|
|
|
|
|
|
// if the message we sent is not a "pending" one, we need to free the buffer
|
|
|
|
|
if (client->sending_buffer != NULL) { |
|
|
|
|
PktBuf *buf = client->sending_buffer; |
|
|
|
|
//DBG("PktBuf free %p l=%d\n", buf, buf->filled);
|
|
|
|
|
//DBG_MQTT("PktBuf free %p l=%d\n", buf, buf->filled);
|
|
|
|
|
os_free(buf); |
|
|
|
|
client->sending_buffer = NULL; |
|
|
|
|
} |
|
|
|
@ -290,7 +290,7 @@ mqtt_tcpclient_sent_cb(void* arg) { |
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_timer(void* arg) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client*)arg; |
|
|
|
|
//DBG("MQTT: timer CB\n");
|
|
|
|
|
//DBG_MQTT("MQTT: timer CB\n");
|
|
|
|
|
|
|
|
|
|
switch (client->connState) { |
|
|
|
|
default: break; |
|
|
|
@ -314,7 +314,7 @@ mqtt_timer(void* arg) { |
|
|
|
|
// check whether we need to send a keep-alive message
|
|
|
|
|
if (client->keepAliveTick > 0 && --client->keepAliveTick == 0) { |
|
|
|
|
// timeout: we need to send a ping message
|
|
|
|
|
//DBG("MQTT: Send keepalive\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Send keepalive\n");
|
|
|
|
|
mqtt_msg_pingreq(&client->mqtt_connection); |
|
|
|
|
PktBuf *buf = PktBuf_New(client->mqtt_connection.message.length); |
|
|
|
|
os_memcpy(buf->data, client->mqtt_connection.message.data, |
|
|
|
@ -350,13 +350,13 @@ void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_discon_cb(void* arg) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
DBG("MQTT: Disconnect CB, freeing espconn %p\n", arg); |
|
|
|
|
DBG_MQTT("MQTT: Disconnect CB, freeing espconn %p\n", arg); |
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
|
|
|
|
|
// if this is an aborted connection we're done
|
|
|
|
|
if (client == NULL) return; |
|
|
|
|
DBG("MQTT: Disconnected from %s:%d\n", client->host, client->port); |
|
|
|
|
DBG_MQTT("MQTT: Disconnected from %s:%d\n", client->host, client->port); |
|
|
|
|
if (client->disconnectedCb) client->disconnectedCb(client); |
|
|
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client); |
|
|
|
|
|
|
|
|
@ -376,7 +376,7 @@ static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_recon_cb(void* arg, int8_t err) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
//DBG("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err);
|
|
|
|
|
//DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err);
|
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port); |
|
|
|
@ -439,7 +439,7 @@ mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len) { |
|
|
|
|
*/ |
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_send_message(MQTT_Client* client) { |
|
|
|
|
//DBG("MQTT: Send_message\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Send_message\n");
|
|
|
|
|
PktBuf *buf = client->msgQueue; |
|
|
|
|
if (buf == NULL || client->sending) return; // ahem...
|
|
|
|
|
client->msgQueue = PktBuf_Shift(client->msgQueue); |
|
|
|
@ -502,7 +502,7 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
DBG("MQTT: ip %d.%d.%d.%d\n", |
|
|
|
|
DBG_MQTT("MQTT: ip %d.%d.%d.%d\n", |
|
|
|
|
*((uint8 *)&ipaddr->addr), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 1), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 2), |
|
|
|
@ -521,7 +521,7 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
} else { |
|
|
|
|
DBG("MQTT: connecting...\n"); |
|
|
|
|
DBG_MQTT("MQTT: connecting...\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -574,7 +574,7 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint16_t |
|
|
|
|
os_memcpy(buf->data, msg.message.data, msg.message.length); |
|
|
|
|
buf->filled = msg.message.length; |
|
|
|
|
|
|
|
|
|
DBG("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length); |
|
|
|
|
DBG_MQTT("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length); |
|
|
|
|
//dumpMem(buf, buf_len);
|
|
|
|
|
client->msgQueue = PktBuf_Push(client->msgQueue, buf); |
|
|
|
|
|
|
|
|
@ -598,7 +598,7 @@ MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { |
|
|
|
|
os_printf("MQTT ERROR: Queuing Subscribe failed (too long)\n"); |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
|
DBG("MQTT: Subscribe, topic: \"%s\"\n", topic); |
|
|
|
|
DBG_MQTT("MQTT: Subscribe, topic: \"%s\"\n", topic); |
|
|
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data, |
|
|
|
|
client->mqtt_connection.message.length); |
|
|
|
|
return TRUE; |
|
|
|
@ -627,7 +627,7 @@ void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Init(MQTT_Client* client, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, |
|
|
|
|
char* client_id, char* client_user, char* client_pass, |
|
|
|
|
uint8_t keepAliveTime) { |
|
|
|
|
DBG("MQTT_Init\n"); |
|
|
|
|
DBG_MQTT("MQTT_Init\n"); |
|
|
|
|
|
|
|
|
|
os_memset(client, 0, sizeof(MQTT_Client)); |
|
|
|
|
|
|
|
|
@ -754,7 +754,7 @@ mqtt_doAbort(MQTT_Client* client) { |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Reconnect(MQTT_Client* client) { |
|
|
|
|
DBG("MQTT: Reconnect requested\n"); |
|
|
|
|
DBG_MQTT("MQTT: Reconnect requested\n"); |
|
|
|
|
if (client->connState == MQTT_DISCONNECTED) |
|
|
|
|
MQTT_Connect(client); |
|
|
|
|
else if (client->connState == MQTT_CONNECTED) |
|
|
|
@ -764,7 +764,7 @@ MQTT_Reconnect(MQTT_Client* client) { |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Disconnect(MQTT_Client* client) { |
|
|
|
|
DBG("MQTT: Disconnect requested\n"); |
|
|
|
|
DBG_MQTT("MQTT: Disconnect requested\n"); |
|
|
|
|
os_timer_disarm(&client->mqttTimer); |
|
|
|
|
if (client->connState == MQTT_DISCONNECTED) return; |
|
|
|
|
if (client->connState == TCP_RECONNECT_REQ) { |
|
|
|
@ -779,7 +779,7 @@ MQTT_Disconnect(MQTT_Client* client) { |
|
|
|
|
|
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Free(MQTT_Client* client) { |
|
|
|
|
DBG("MQTT: Free requested\n"); |
|
|
|
|
DBG_MQTT("MQTT: Free requested\n"); |
|
|
|
|
MQTT_Disconnect(client); |
|
|
|
|
|
|
|
|
|
if (client->host) os_free(client->host); |
|
|
|
|