|
|
|
@ -41,6 +41,12 @@ |
|
|
|
|
#include "pktbuf.h" |
|
|
|
|
#include "mqtt.h" |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
#define DBG_MQTT(format, ...) os_printf(format, ## __VA_ARGS__) |
|
|
|
|
#else |
|
|
|
|
#define DBG_MQTT(format, ...) do { } while(0) |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
extern void dumpMem(void *buf, int len); |
|
|
|
|
|
|
|
|
|
// HACK
|
|
|
|
@ -129,9 +135,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
if (msg_len > client->in_buffer_size) { |
|
|
|
|
// oops, too long a message for us to digest, disconnect and hope for a miracle
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Too long a message (%d bytes)\n", msg_len); |
|
|
|
|
#endif |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -141,9 +145,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
if (client->connState != MQTT_CONNECTED) { |
|
|
|
|
// why are we receiving something??
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: recv in invalid state %d\n", client->connState); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT ERROR: recv in invalid state %d\n", client->connState); |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -155,16 +157,12 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
pending_msg_type = mqtt_get_type(client->pending_buffer->data); |
|
|
|
|
pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled); |
|
|
|
|
} |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n", |
|
|
|
|
DBG_MQTT("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n", |
|
|
|
|
mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type],pending_msg_id); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
switch (msg_type) { |
|
|
|
|
case MQTT_MSG_TYPE_CONNACK: |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Connect successful\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Connect successful\n"); |
|
|
|
|
// callbacks for internal and external clients
|
|
|
|
|
if (client->connectedCb) client->connectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdConnectedCb) client->cmdConnectedCb((uint32_t*)client); |
|
|
|
@ -172,36 +170,28 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_SUBACK: |
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Subscribe successful\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Subscribe successful\n"); |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_UNSUBACK: |
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Unsubscribe successful\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Unsubscribe successful\n"); |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBACK: // ack for a publish we sent
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: QoS1 Publish successful\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: QoS1 Publish successful\n"); |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBREC: // rec for a publish we sent
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: QoS2 publish cont\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: QoS2 publish cont\n"); |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
// we need to send PUBREL
|
|
|
|
|
mqtt_msg_pubrel(&client->mqtt_connection, msg_id); |
|
|
|
@ -212,9 +202,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBCOMP: // comp for a pubrel we sent (originally publish we sent)
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: QoS2 Publish successful\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: QoS2 Publish successful\n"); |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
@ -240,9 +228,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBREL: // rel for a rec we sent (originally publish received)
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Cont QoS2 recv\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Cont QoS2 recv\n"); |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
// we need to send PUBCOMP
|
|
|
|
|
mqtt_msg_pubcomp(&client->mqtt_connection, msg_id); |
|
|
|
@ -275,16 +261,16 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
*/ |
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_sent_cb(void* arg) { |
|
|
|
|
//os_printf("MQTT: sent CB\n");
|
|
|
|
|
//DBG_MQTT("MQTT: sent CB\n");
|
|
|
|
|
struct espconn* pCon = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse; |
|
|
|
|
if (client == NULL) return; // aborted connection ?
|
|
|
|
|
//os_printf("MQTT: Sent\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Sent\n");
|
|
|
|
|
|
|
|
|
|
// if the message we sent is not a "pending" one, we need to free the buffer
|
|
|
|
|
if (client->sending_buffer != NULL) { |
|
|
|
|
PktBuf *buf = client->sending_buffer; |
|
|
|
|
//os_printf("PktBuf free %p l=%d\n", buf, buf->filled);
|
|
|
|
|
//DBG_MQTT("PktBuf free %p l=%d\n", buf, buf->filled);
|
|
|
|
|
os_free(buf); |
|
|
|
|
client->sending_buffer = NULL; |
|
|
|
|
} |
|
|
|
@ -303,7 +289,7 @@ mqtt_tcpclient_sent_cb(void* arg) { |
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_timer(void* arg) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client*)arg; |
|
|
|
|
//os_printf("MQTT: timer CB\n");
|
|
|
|
|
//DBG_MQTT("MQTT: timer CB\n");
|
|
|
|
|
|
|
|
|
|
switch (client->connState) { |
|
|
|
|
default: break; |
|
|
|
@ -319,9 +305,7 @@ mqtt_timer(void* arg) { |
|
|
|
|
|
|
|
|
|
// check whether our last keep-alive timed out
|
|
|
|
|
if (client->keepAliveAckTick > 0 && --client->keepAliveAckTick == 0) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("\nMQTT ERROR: Keep-alive timed out\n"); |
|
|
|
|
#endif |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -329,7 +313,7 @@ mqtt_timer(void* arg) { |
|
|
|
|
// check whether we need to send a keep-alive message
|
|
|
|
|
if (client->keepAliveTick > 0 && --client->keepAliveTick == 0) { |
|
|
|
|
// timeout: we need to send a ping message
|
|
|
|
|
//os_printf("MQTT: Send keepalive\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Send keepalive\n");
|
|
|
|
|
mqtt_msg_pingreq(&client->mqtt_connection); |
|
|
|
|
PktBuf *buf = PktBuf_New(client->mqtt_connection.message.length); |
|
|
|
|
os_memcpy(buf->data, client->mqtt_connection.message.data, |
|
|
|
@ -365,17 +349,13 @@ void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_discon_cb(void* arg) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Disconnect CB, freeing espconn %p\n", arg); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Disconnect CB, freeing espconn %p\n", arg); |
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
|
|
|
|
|
// if this is an aborted connection we're done
|
|
|
|
|
if (client == NULL) return; |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Disconnected from %s:%d\n", client->host, client->port); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Disconnected from %s:%d\n", client->host, client->port); |
|
|
|
|
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client); |
|
|
|
|
|
|
|
|
@ -394,14 +374,10 @@ static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_recon_cb(void* arg, int8_t err) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err); |
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port); |
|
|
|
|
#endif |
|
|
|
|
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client); |
|
|
|
|
|
|
|
|
@ -426,9 +402,7 @@ mqtt_tcpclient_connect_cb(void* arg) { |
|
|
|
|
espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); |
|
|
|
|
espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv); |
|
|
|
|
espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb); |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: TCP connected to %s:%d\n", client->host, client->port); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
// send MQTT connect message to broker
|
|
|
|
|
mqtt_msg_connect(&client->mqtt_connection, &client->connect_info); |
|
|
|
@ -461,7 +435,7 @@ mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len) { |
|
|
|
|
*/ |
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_send_message(MQTT_Client* client) { |
|
|
|
|
//os_printf("MQTT: Send_message\n");
|
|
|
|
|
//DBG_MQTT("MQTT: Send_message\n");
|
|
|
|
|
PktBuf *buf = client->msgQueue; |
|
|
|
|
if (buf == NULL || client->sending) return; // ahem...
|
|
|
|
|
client->msgQueue = PktBuf_Shift(client->msgQueue); |
|
|
|
@ -516,20 +490,16 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pConn->reverse; |
|
|
|
|
|
|
|
|
|
if (ipaddr == NULL) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT DNS: Got no ip, try to reconnect\n"); |
|
|
|
|
#endif |
|
|
|
|
os_printf("MQTT DNS: lookup failed\n"); |
|
|
|
|
client->timeoutTick = 10; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT DNS: found ip %d.%d.%d.%d\n", |
|
|
|
|
DBG_MQTT("MQTT DNS: found ip %d.%d.%d.%d\n", |
|
|
|
|
*((uint8 *)&ipaddr->addr), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 1), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 2), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 3)); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
if (client->ip.addr == 0 && ipaddr->addr != 0) { |
|
|
|
|
os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); |
|
|
|
@ -539,15 +509,11 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
else |
|
|
|
|
err = espconn_connect(client->pCon); |
|
|
|
|
if (err != 0) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Failed to connect\n"); |
|
|
|
|
#endif |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
client->timeoutTick = 10; |
|
|
|
|
} else { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: connecting...\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: connecting...\n"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -581,9 +547,7 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t q |
|
|
|
|
uint16_t buf_len = 3 + 2 + 2 + topic_length + data_length + 16; |
|
|
|
|
PktBuf *buf = PktBuf_New(buf_len); |
|
|
|
|
if (buf == NULL) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Cannot allocate buffer for %d byte publish\n", buf_len); |
|
|
|
|
#endif |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
|
// use a temporary mqtt_message_t pointing to our buffer, this is a bit of a mess because we
|
|
|
|
@ -592,9 +556,7 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t q |
|
|
|
|
msg_conn_init(&msg, &client->mqtt_connection, buf->data, buf_len); |
|
|
|
|
uint16_t msg_id; |
|
|
|
|
if (!mqtt_msg_publish(&msg, topic, data, data_length, qos, retain, &msg_id)){ |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Queuing Publish failed\n"); |
|
|
|
|
#endif |
|
|
|
|
os_free(buf); |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
@ -603,7 +565,7 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t q |
|
|
|
|
os_memcpy(buf->data, msg.message.data, msg.message.length); |
|
|
|
|
buf->filled = msg.message.length; |
|
|
|
|
|
|
|
|
|
os_printf("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length); |
|
|
|
|
DBG_MQTT("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length); |
|
|
|
|
dumpMem(buf, buf_len); |
|
|
|
|
client->msgQueue = PktBuf_Push(client->msgQueue, buf); |
|
|
|
|
|
|
|
|
@ -624,14 +586,10 @@ bool ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { |
|
|
|
|
uint16_t msg_id; |
|
|
|
|
if (!mqtt_msg_subscribe(&client->mqtt_connection, topic, 0, &msg_id)) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Queuing Subscribe failed (too long)\n"); |
|
|
|
|
#endif |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Subscribe, topic: \"%s\"\n", topic); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT: Subscribe, topic: \"%s\"\n", topic); |
|
|
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data, |
|
|
|
|
client->mqtt_connection.message.length); |
|
|
|
|
return TRUE; |
|
|
|
@ -660,9 +618,7 @@ void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, |
|
|
|
|
char* client_id, char* client_user, char* client_pass, |
|
|
|
|
uint8_t keepAliveTime) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT_Init\n"); |
|
|
|
|
#endif |
|
|
|
|
DBG_MQTT("MQTT_Init\n"); |
|
|
|
|
|
|
|
|
|
os_memset(mqttClient, 0, sizeof(MQTT_Client)); |
|
|
|
|
|
|
|
|
@ -739,9 +695,7 @@ MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
os_timer_arm(&mqttClient->mqttTimer, 1000, 1); |
|
|
|
|
|
|
|
|
|
// initiate the TCP connection or DNS lookup
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Connect to %s:%d %p\n", mqttClient->host, mqttClient->port, mqttClient->pCon); |
|
|
|
|
#endif |
|
|
|
|
if (UTILS_StrToIP((const char *)mqttClient->host, |
|
|
|
|
(void*)&mqttClient->pCon->proto.tcp->remote_ip)) { |
|
|
|
|
uint8_t err; |
|
|
|
@ -750,9 +704,7 @@ MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
else |
|
|
|
|
err = espconn_connect(mqttClient->pCon); |
|
|
|
|
if (err != 0) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Failed to connect\n"); |
|
|
|
|
#endif |
|
|
|
|
os_free(mqttClient->pCon->proto.tcp); |
|
|
|
|
os_free(mqttClient->pCon); |
|
|
|
|
mqttClient->pCon = NULL; |
|
|
|
@ -770,9 +722,7 @@ MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
|
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_doAbort(MQTT_Client* client) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Disconnecting from %s:%d (%p)\n", client->host, client->port, client->pCon); |
|
|
|
|
#endif |
|
|
|
|
client->pCon->reverse = NULL; // ensure we jettison this pCon...
|
|
|
|
|
if (client->security) |
|
|
|
|
espconn_secure_disconnect(client->pCon); |
|
|
|
|