|
|
|
@ -59,10 +59,12 @@ sint8 espconn_secure_sent(struct espconn *espconn, uint8 *psent, uint16 length) |
|
|
|
|
// max message size for sending (except publish)
|
|
|
|
|
#define MQTT_MAX_SHORT_MESSAGE 128 |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
static char* mqtt_msg_type[] = { |
|
|
|
|
"NULL", "TYPE_CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", |
|
|
|
|
"SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT", "RESV", |
|
|
|
|
}; |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
// forward declarations
|
|
|
|
|
static void mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len); |
|
|
|
@ -127,7 +129,9 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
if (msg_len > client->in_buffer_size) { |
|
|
|
|
// oops, too long a message for us to digest, disconnect and hope for a miracle
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Too long a message (%d bytes)\n", msg_len); |
|
|
|
|
#endif |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -137,7 +141,9 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
if (client->connState != MQTT_CONNECTED) { |
|
|
|
|
// why are we receiving something??
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: recv in invalid state %d\n", client->connState); |
|
|
|
|
#endif |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -149,13 +155,16 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
pending_msg_type = mqtt_get_type(client->pending_buffer->data); |
|
|
|
|
pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
os_printf("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%04X\n", |
|
|
|
|
mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type], pending_msg_id); |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n", |
|
|
|
|
mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type],pending_msg_id); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
switch (msg_type) { |
|
|
|
|
case MQTT_MSG_TYPE_CONNACK: |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Connect successful\n"); |
|
|
|
|
#endif |
|
|
|
|
// callbacks for internal and external clients
|
|
|
|
|
if (client->connectedCb) client->connectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdConnectedCb) client->cmdConnectedCb((uint32_t*)client); |
|
|
|
@ -163,28 +172,36 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_SUBACK: |
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Subscribe successful\n"); |
|
|
|
|
#endif |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_UNSUBACK: |
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Unsubscribe successful\n"); |
|
|
|
|
#endif |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBACK: // ack for a publish we sent
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: QoS1 Publish successful\n"); |
|
|
|
|
#endif |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBREC: // rec for a publish we sent
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: QoS2 publish cont\n"); |
|
|
|
|
#endif |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
// we need to send PUBREL
|
|
|
|
|
mqtt_msg_pubrel(&client->mqtt_connection, msg_id); |
|
|
|
@ -195,7 +212,9 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBCOMP: // comp for a pubrel we sent (originally publish we sent)
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: QoS2 Publish successful\n"); |
|
|
|
|
#endif |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
@ -203,9 +222,11 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
case MQTT_MSG_TYPE_PUBLISH: { // incoming publish
|
|
|
|
|
// we may need to ACK the publish
|
|
|
|
|
uint8_t msg_qos = mqtt_get_qos(client->in_buffer); |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
uint16_t topic_length = msg_len; |
|
|
|
|
os_printf("MQTT: Recv PUBLISH qos=%d %s\n", msg_qos, |
|
|
|
|
mqtt_get_publish_topic(client->in_buffer, &topic_length)); |
|
|
|
|
#endif |
|
|
|
|
if (msg_qos == 1) mqtt_msg_puback(&client->mqtt_connection, msg_id); |
|
|
|
|
if (msg_qos == 2) mqtt_msg_pubrec(&client->mqtt_connection, msg_id); |
|
|
|
|
if (msg_qos == 1 || msg_qos == 2) { |
|
|
|
@ -219,7 +240,9 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { |
|
|
|
|
|
|
|
|
|
case MQTT_MSG_TYPE_PUBREL: // rel for a rec we sent (originally publish received)
|
|
|
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg_id == msg_id) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Cont QoS2 recv\n"); |
|
|
|
|
#endif |
|
|
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer); |
|
|
|
|
// we need to send PUBCOMP
|
|
|
|
|
mqtt_msg_pubcomp(&client->mqtt_connection, msg_id); |
|
|
|
@ -296,7 +319,9 @@ mqtt_timer(void* arg) { |
|
|
|
|
|
|
|
|
|
// check whether our last keep-alive timed out
|
|
|
|
|
if (client->keepAliveAckTick > 0 && --client->keepAliveAckTick == 0) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("\nMQTT ERROR: Keep-alive timed out\n"); |
|
|
|
|
#endif |
|
|
|
|
mqtt_doAbort(client); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -340,15 +365,17 @@ void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_discon_cb(void* arg) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Disconnect CB, freeing espconn %p\n", arg); |
|
|
|
|
#endif |
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
|
|
|
|
|
// if this is an aborted connection we're done
|
|
|
|
|
if (client == NULL) return; |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Disconnected from %s:%d\n", client->host, client->port); |
|
|
|
|
#endif |
|
|
|
|
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client); |
|
|
|
|
|
|
|
|
@ -367,12 +394,14 @@ static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_tcpclient_recon_cb(void* arg, int8_t err) { |
|
|
|
|
struct espconn* pespconn = (struct espconn *)arg; |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse; |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err); |
|
|
|
|
#endif |
|
|
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp); |
|
|
|
|
os_free(pespconn); |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port); |
|
|
|
|
#endif |
|
|
|
|
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client); |
|
|
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client); |
|
|
|
|
|
|
|
|
@ -397,7 +426,9 @@ mqtt_tcpclient_connect_cb(void* arg) { |
|
|
|
|
espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); |
|
|
|
|
espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv); |
|
|
|
|
espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb); |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: TCP connected to %s:%d\n", client->host, client->port); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
// send MQTT connect message to broker
|
|
|
|
|
mqtt_msg_connect(&client->mqtt_connection, &client->connect_info); |
|
|
|
@ -438,8 +469,9 @@ mqtt_send_message(MQTT_Client* client) { |
|
|
|
|
// get some details about the message
|
|
|
|
|
uint16_t msg_type = mqtt_get_type(buf->data); |
|
|
|
|
uint8_t msg_id = mqtt_get_id(buf->data, buf->filled); |
|
|
|
|
msg_id = msg_id; |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Send type=%s id=%04X len=%d\n", mqtt_msg_type[msg_type], msg_id, buf->filled); |
|
|
|
|
#if 0 |
|
|
|
|
for (int i=0; i<buf->filled; i++) { |
|
|
|
|
if (buf->data[i] >= ' ' && buf->data[i] <= '~') os_printf("%c", buf->data[i]); |
|
|
|
|
else os_printf("\\x%02X", buf->data[i]); |
|
|
|
@ -484,17 +516,20 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
MQTT_Client* client = (MQTT_Client *)pConn->reverse; |
|
|
|
|
|
|
|
|
|
if (ipaddr == NULL) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT DNS: Got no ip, try to reconnect\n"); |
|
|
|
|
#endif |
|
|
|
|
client->timeoutTick = 10; |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT DNS: found ip %d.%d.%d.%d\n", |
|
|
|
|
*((uint8 *)&ipaddr->addr), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 1), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 2), |
|
|
|
|
*((uint8 *)&ipaddr->addr + 3)); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
if (client->ip.addr == 0 && ipaddr->addr != 0) { |
|
|
|
|
os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); |
|
|
|
@ -504,11 +539,15 @@ mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { |
|
|
|
|
else |
|
|
|
|
err = espconn_connect(client->pCon); |
|
|
|
|
if (err != 0) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Failed to connect\n"); |
|
|
|
|
#endif |
|
|
|
|
client->connState = TCP_RECONNECT_REQ; |
|
|
|
|
client->timeoutTick = 10; |
|
|
|
|
} else { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: connecting...\n"); |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -542,7 +581,9 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t q |
|
|
|
|
uint16_t buf_len = 3 + 2 + 2 + topic_length + data_length + 16; |
|
|
|
|
PktBuf *buf = PktBuf_New(buf_len); |
|
|
|
|
if (buf == NULL) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Cannot allocate buffer for %d byte publish\n", buf_len); |
|
|
|
|
#endif |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
|
// use a temporary mqtt_message_t pointing to our buffer, this is a bit of a mess because we
|
|
|
|
@ -551,7 +592,9 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t q |
|
|
|
|
msg_conn_init(&msg, &client->mqtt_connection, buf->data, buf_len); |
|
|
|
|
uint16_t msg_id; |
|
|
|
|
if (!mqtt_msg_publish(&msg, topic, data, data_length, qos, retain, &msg_id)){ |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Queuing Publish failed\n"); |
|
|
|
|
#endif |
|
|
|
|
os_free(buf); |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
@ -581,10 +624,14 @@ bool ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { |
|
|
|
|
uint16_t msg_id; |
|
|
|
|
if (!mqtt_msg_subscribe(&client->mqtt_connection, topic, 0, &msg_id)) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Queuing Subscribe failed (too long)\n"); |
|
|
|
|
#endif |
|
|
|
|
return FALSE; |
|
|
|
|
} |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Subscribe, topic: \"%s\"\n", topic); |
|
|
|
|
#endif |
|
|
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data, |
|
|
|
|
client->mqtt_connection.message.length); |
|
|
|
|
return TRUE; |
|
|
|
@ -602,14 +649,20 @@ MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { |
|
|
|
|
* @param client_user: MQTT client user |
|
|
|
|
* @param client_pass: MQTT client password |
|
|
|
|
* @param keepAliveTime: MQTT keep alive timer, in second |
|
|
|
|
* @param cleanSession: MQTT ... |
|
|
|
|
* @param cleanSession: On connection, a client sets the "clean session" flag, which is sometimes also known as the "clean start" flag.
|
|
|
|
|
* If clean session is set to false, then the connection is treated as durable. This means that when the client
|
|
|
|
|
* disconnects, any subscriptions it has will remain and any subsequent QoS 1 or 2 messages will be stored until |
|
|
|
|
* it connects again in the future. If clean session is true, then all subscriptions will be removed for the client |
|
|
|
|
* when it disconnects. |
|
|
|
|
* @retval None |
|
|
|
|
*/ |
|
|
|
|
void ICACHE_FLASH_ATTR |
|
|
|
|
MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, |
|
|
|
|
char* client_id, char* client_user, char* client_pass, |
|
|
|
|
uint8_t keepAliveTime) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT_Init\n"); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
os_memset(mqttClient, 0, sizeof(MQTT_Client)); |
|
|
|
|
|
|
|
|
@ -686,7 +739,9 @@ MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
os_timer_arm(&mqttClient->mqttTimer, 1000, 1); |
|
|
|
|
|
|
|
|
|
// initiate the TCP connection or DNS lookup
|
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Connect to %s:%d %p\n", mqttClient->host, mqttClient->port, mqttClient->pCon); |
|
|
|
|
#endif |
|
|
|
|
if (UTILS_StrToIP((const char *)mqttClient->host, |
|
|
|
|
(void*)&mqttClient->pCon->proto.tcp->remote_ip)) { |
|
|
|
|
uint8_t err; |
|
|
|
@ -695,7 +750,9 @@ MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
else |
|
|
|
|
err = espconn_connect(mqttClient->pCon); |
|
|
|
|
if (err != 0) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT ERROR: Failed to connect\n"); |
|
|
|
|
#endif |
|
|
|
|
os_free(mqttClient->pCon->proto.tcp); |
|
|
|
|
os_free(mqttClient->pCon); |
|
|
|
|
mqttClient->pCon = NULL; |
|
|
|
@ -713,7 +770,9 @@ MQTT_Connect(MQTT_Client* mqttClient) { |
|
|
|
|
|
|
|
|
|
static void ICACHE_FLASH_ATTR |
|
|
|
|
mqtt_doAbort(MQTT_Client* client) { |
|
|
|
|
#ifdef MQTT_DBG |
|
|
|
|
os_printf("MQTT: Disconnecting from %s:%d (%p)\n", client->host, client->port, client->pCon); |
|
|
|
|
#endif |
|
|
|
|
client->pCon->reverse = NULL; // ensure we jettison this pCon...
|
|
|
|
|
if (client->security) |
|
|
|
|
espconn_secure_disconnect(client->pCon); |
|
|
|
|