mirror of https://github.com/jeelabs/esp-link.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
820 lines
30 KiB
820 lines
30 KiB
/* mqtt.c
|
|
* Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
|
|
*
|
|
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Modified by Thorsten von Eicken to make it fully callback based
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
// TODO:
|
|
// Handle SessionPresent=0 in CONNACK and rexmit subscriptions
|
|
// Improve timeout for CONNACK, currently only has keep-alive timeout (maybe send artificial ping?)
|
|
// Allow messages that don't require ACK to be sent even when pending_buffer is != NULL
|
|
// Set dup flag in retransmissions
|
|
|
|
#include <esp8266.h>
|
|
#include "pktbuf.h"
|
|
#include "mqtt.h"
|
|
|
|
#ifdef MQTT_DBG
|
|
#define DBG_MQTT(format, ...) os_printf(format, ## __VA_ARGS__)
|
|
#else
|
|
#define DBG_MQTT(format, ...) do { } while(0)
|
|
#endif
|
|
|
|
extern void dumpMem(void *buf, int len);
|
|
|
|
// HACK
|
|
sint8 espconn_secure_connect(struct espconn *espconn) {
|
|
return espconn_connect(espconn);
|
|
}
|
|
sint8 espconn_secure_disconnect(struct espconn *espconn) {
|
|
return espconn_disconnect(espconn);
|
|
}
|
|
sint8 espconn_secure_sent(struct espconn *espconn, uint8 *psent, uint16 length) {
|
|
return espconn_sent(espconn, psent, length);
|
|
}
|
|
|
|
// max message size supported for receive
|
|
#define MQTT_MAX_RCV_MESSAGE 2048
|
|
// max message size for sending (except publish)
|
|
#define MQTT_MAX_SHORT_MESSAGE 128
|
|
|
|
#ifdef MQTT_DBG
|
|
static char* mqtt_msg_type[] = {
|
|
"NULL", "TYPE_CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP",
|
|
"SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT", "RESV",
|
|
};
|
|
#endif
|
|
|
|
// forward declarations
|
|
static void mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len);
|
|
static void mqtt_send_message(MQTT_Client* client);
|
|
static void mqtt_doAbort(MQTT_Client* client);
|
|
|
|
// Deliver a publish message to the client
|
|
static void ICACHE_FLASH_ATTR
|
|
deliver_publish(MQTT_Client* client, uint8_t* message, uint16_t length) {
|
|
|
|
// parse the message into topic and data
|
|
uint16_t topic_length = length;
|
|
const char *topic = mqtt_get_publish_topic(message, &topic_length);
|
|
uint16_t data_length = length;
|
|
const char *data = mqtt_get_publish_data(message, &data_length);
|
|
|
|
// callback to client
|
|
if (client->dataCb)
|
|
client->dataCb(client, topic, topic_length, data, data_length);
|
|
if (client->cmdDataCb)
|
|
client->cmdDataCb(client, topic, topic_length, data, data_length);
|
|
}
|
|
|
|
/**
|
|
* @brief Client received callback function.
|
|
* @param arg: contain the ip link information
|
|
* @param pdata: received data
|
|
* @param len: the length of received data
|
|
* @retval None
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) {
|
|
//os_printf("MQTT: recv CB\n");
|
|
uint8_t msg_type;
|
|
uint16_t msg_id;
|
|
uint16_t msg_len;
|
|
|
|
struct espconn* pCon = (struct espconn*)arg;
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
|
if (client == NULL) return; // aborted connection
|
|
|
|
//os_printf("MQTT: Data received %d bytes\n", len);
|
|
|
|
do {
|
|
// append data to our buffer
|
|
int avail = client->in_buffer_size - client->in_buffer_filled;
|
|
if (len <= avail) {
|
|
os_memcpy(client->in_buffer + client->in_buffer_filled, pdata, len);
|
|
client->in_buffer_filled += len;
|
|
len = 0;
|
|
} else {
|
|
os_memcpy(client->in_buffer + client->in_buffer_filled, pdata, avail);
|
|
client->in_buffer_filled += avail;
|
|
len -= avail;
|
|
pdata += avail;
|
|
}
|
|
|
|
// check out what's at the head of the buffer
|
|
msg_type = mqtt_get_type(client->in_buffer);
|
|
msg_id = mqtt_get_id(client->in_buffer, client->in_buffer_size);
|
|
msg_len = mqtt_get_total_length(client->in_buffer, client->in_buffer_size);
|
|
|
|
if (msg_len > client->in_buffer_size) {
|
|
// oops, too long a message for us to digest, disconnect and hope for a miracle
|
|
os_printf("MQTT: Too long a message (%d bytes)\n", msg_len);
|
|
mqtt_doAbort(client);
|
|
return;
|
|
}
|
|
|
|
// check whether what's left in the buffer is a complete message
|
|
if (msg_len > client->in_buffer_filled) break;
|
|
|
|
if (client->connState != MQTT_CONNECTED) {
|
|
// why are we receiving something??
|
|
DBG_MQTT("MQTT ERROR: recv in invalid state %d\n", client->connState);
|
|
mqtt_doAbort(client);
|
|
return;
|
|
}
|
|
|
|
// we are connected and are sending/receiving data messages
|
|
uint8_t pending_msg_type = 0;
|
|
uint16_t pending_msg_id = 0;
|
|
if (client->pending_buffer != NULL) {
|
|
pending_msg_type = mqtt_get_type(client->pending_buffer->data);
|
|
pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled);
|
|
}
|
|
DBG_MQTT("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n",
|
|
mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type],pending_msg_id);
|
|
|
|
switch (msg_type) {
|
|
case MQTT_MSG_TYPE_CONNACK:
|
|
//DBG_MQTT("MQTT: Connect successful\n");
|
|
// callbacks for internal and external clients
|
|
if (client->connectedCb) client->connectedCb(client);
|
|
if (client->cmdConnectedCb) client->cmdConnectedCb(client);
|
|
client->reconTimeout = 1; // reset the reconnect backoff
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_SUBACK:
|
|
if (pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && pending_msg_id == msg_id) {
|
|
//DBG_MQTT("MQTT: Subscribe successful\n");
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_UNSUBACK:
|
|
if (pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && pending_msg_id == msg_id) {
|
|
//DBG_MQTT("MQTT: Unsubscribe successful\n");
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_PUBACK: // ack for a publish we sent
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) {
|
|
//DBG_MQTT("MQTT: QoS1 Publish successful\n");
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_PUBREC: // rec for a publish we sent
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) {
|
|
//DBG_MQTT("MQTT: QoS2 publish cont\n");
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
|
|
// we need to send PUBREL
|
|
mqtt_msg_pubrel(&client->mqtt_connection, msg_id);
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data,
|
|
client->mqtt_connection.message.length);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_PUBCOMP: // comp for a pubrel we sent (originally publish we sent)
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg_id == msg_id) {
|
|
//DBG_MQTT("MQTT: QoS2 Publish successful\n");
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_PUBLISH: { // incoming publish
|
|
// we may need to ACK the publish
|
|
uint8_t msg_qos = mqtt_get_qos(client->in_buffer);
|
|
#ifdef MQTT_DBG
|
|
uint16_t topic_length = msg_len;
|
|
os_printf("MQTT: Recv PUBLISH qos=%d %s\n", msg_qos,
|
|
mqtt_get_publish_topic(client->in_buffer, &topic_length));
|
|
#endif
|
|
if (msg_qos == 1) mqtt_msg_puback(&client->mqtt_connection, msg_id);
|
|
if (msg_qos == 2) mqtt_msg_pubrec(&client->mqtt_connection, msg_id);
|
|
if (msg_qos == 1 || msg_qos == 2) {
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data,
|
|
client->mqtt_connection.message.length);
|
|
}
|
|
// send the publish message to clients
|
|
deliver_publish(client, client->in_buffer, msg_len);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_PUBREL: // rel for a rec we sent (originally publish received)
|
|
if (pending_msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg_id == msg_id) {
|
|
//DBG_MQTT("MQTT: Cont QoS2 recv\n");
|
|
client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
|
|
// we need to send PUBCOMP
|
|
mqtt_msg_pubcomp(&client->mqtt_connection, msg_id);
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data,
|
|
client->mqtt_connection.message.length);
|
|
}
|
|
break;
|
|
|
|
case MQTT_MSG_TYPE_PINGRESP:
|
|
client->keepAliveAckTick = 0;
|
|
break;
|
|
}
|
|
|
|
// Shift out the message and see whether we have another one
|
|
if (msg_len < client->in_buffer_filled)
|
|
os_memcpy(client->in_buffer, client->in_buffer+msg_len, client->in_buffer_filled-msg_len);
|
|
client->in_buffer_filled -= msg_len;
|
|
} while(client->in_buffer_filled > 0 || len > 0);
|
|
|
|
// Send next packet out, if possible
|
|
if (!client->sending && client->pending_buffer == NULL && client->msgQueue != NULL) {
|
|
mqtt_send_message(client);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Callback from TCP that previous send completed
|
|
* @param arg: contain the ip link information
|
|
* @retval None
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_tcpclient_sent_cb(void* arg) {
|
|
//DBG_MQTT("MQTT: sent CB\n");
|
|
struct espconn* pCon = (struct espconn *)arg;
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
|
if (client == NULL) return; // aborted connection ?
|
|
//DBG_MQTT("MQTT: Sent\n");
|
|
|
|
// if the message we sent is not a "pending" one, we need to free the buffer
|
|
if (client->sending_buffer != NULL) {
|
|
PktBuf *buf = client->sending_buffer;
|
|
//DBG_MQTT("PktBuf free %p l=%d\n", buf, buf->filled);
|
|
os_free(buf);
|
|
client->sending_buffer = NULL;
|
|
}
|
|
client->sending = false;
|
|
|
|
// send next message if one is queued and we're not expecting an ACK
|
|
if (client->connState == MQTT_CONNECTED && client->pending_buffer == NULL &&
|
|
client->msgQueue != NULL) {
|
|
mqtt_send_message(client);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* @brief: Timer function to handle timeouts
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_timer(void* arg) {
|
|
MQTT_Client* client = (MQTT_Client*)arg;
|
|
//DBG_MQTT("MQTT: timer CB\n");
|
|
|
|
switch (client->connState) {
|
|
default: break;
|
|
|
|
case MQTT_CONNECTED:
|
|
// first check whether we're timing out for an ACK
|
|
if (client->pending_buffer != NULL && --client->timeoutTick == 0) {
|
|
// looks like we're not getting a response in time, abort the connection
|
|
mqtt_doAbort(client);
|
|
client->timeoutTick = 0; // trick to make reconnect happen in 1 second
|
|
return;
|
|
}
|
|
|
|
// check whether our last keep-alive timed out
|
|
if (client->keepAliveAckTick > 0 && --client->keepAliveAckTick == 0) {
|
|
os_printf("\nMQTT ERROR: Keep-alive timed out\n");
|
|
mqtt_doAbort(client);
|
|
return;
|
|
}
|
|
|
|
// check whether we need to send a keep-alive message
|
|
if (client->keepAliveTick > 0 && --client->keepAliveTick == 0) {
|
|
// timeout: we need to send a ping message
|
|
//DBG_MQTT("MQTT: Send keepalive\n");
|
|
mqtt_msg_pingreq(&client->mqtt_connection);
|
|
PktBuf *buf = PktBuf_New(client->mqtt_connection.message.length);
|
|
os_memcpy(buf->data, client->mqtt_connection.message.data,
|
|
client->mqtt_connection.message.length);
|
|
buf->filled = client->mqtt_connection.message.length;
|
|
client->msgQueue = PktBuf_Unshift(client->msgQueue, buf);
|
|
mqtt_send_message(client);
|
|
client->keepAliveTick = client->connect_info.keepalive;
|
|
client->keepAliveAckTick = client->sendTimeout;
|
|
}
|
|
|
|
break;
|
|
|
|
case TCP_RECONNECT_REQ:
|
|
if (client->timeoutTick == 0 || --client->timeoutTick == 0) {
|
|
// it's time to reconnect! start by re-enqueueing anything pending
|
|
if (client->pending_buffer != NULL) {
|
|
client->msgQueue = PktBuf_Unshift(client->msgQueue, client->pending_buffer);
|
|
client->pending_buffer = NULL;
|
|
}
|
|
client->connect_info.clean_session = 0; // ask server to keep state
|
|
MQTT_Connect(client);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Callback from SDK that socket is disconnected
|
|
* @param arg: contain the ip link information
|
|
* @retval None
|
|
*/
|
|
void ICACHE_FLASH_ATTR
|
|
mqtt_tcpclient_discon_cb(void* arg) {
|
|
struct espconn* pespconn = (struct espconn *)arg;
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
|
|
DBG_MQTT("MQTT: Disconnect CB, freeing espconn %p\n", arg);
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp);
|
|
os_free(pespconn);
|
|
|
|
// if this is an aborted connection we're done
|
|
if (client == NULL) return;
|
|
DBG_MQTT("MQTT: Disconnected from %s:%d\n", client->host, client->port);
|
|
if (client->disconnectedCb) client->disconnectedCb(client);
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
|
|
|
|
// reconnect unless we're in a permanently disconnected state
|
|
if (client->connState == MQTT_DISCONNECTED) return;
|
|
client->timeoutTick = client->reconTimeout;
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
|
|
client->connState = TCP_RECONNECT_REQ;
|
|
}
|
|
|
|
/**
|
|
* @brief Callback from SDK that socket got reset, note that no discon_cb will follow
|
|
* @param arg: contain the ip link information
|
|
* @retval None
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_tcpclient_recon_cb(void* arg, int8_t err) {
|
|
struct espconn* pespconn = (struct espconn *)arg;
|
|
MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
|
|
//DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err);
|
|
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp);
|
|
os_free(pespconn);
|
|
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port);
|
|
if (client->disconnectedCb) client->disconnectedCb(client);
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
|
|
|
|
// reconnect unless we're in a permanently disconnected state
|
|
if (client->connState == MQTT_DISCONNECTED) return;
|
|
client->timeoutTick = client->reconTimeout;
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
|
|
client->connState = TCP_RECONNECT_REQ;
|
|
os_printf("timeoutTick=%d reconTimeout=%d\n", client->timeoutTick, client->reconTimeout);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Callback from SDK that socket is connected
|
|
* @param arg: contain the ip link information
|
|
* @retval None
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_tcpclient_connect_cb(void* arg) {
|
|
struct espconn* pCon = (struct espconn *)arg;
|
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
|
if (client == NULL) return; // aborted connection
|
|
|
|
espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);
|
|
espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);
|
|
espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);
|
|
os_printf("MQTT: TCP connected to %s:%d\n", client->host, client->port);
|
|
|
|
// send MQTT connect message to broker
|
|
mqtt_msg_connect(&client->mqtt_connection, &client->connect_info);
|
|
PktBuf *buf = PktBuf_New(client->mqtt_connection.message.length);
|
|
os_memcpy(buf->data, client->mqtt_connection.message.data,
|
|
client->mqtt_connection.message.length);
|
|
buf->filled = client->mqtt_connection.message.length;
|
|
client->msgQueue = PktBuf_Unshift(client->msgQueue, buf); // prepend to send (rexmit) queue
|
|
mqtt_send_message(client);
|
|
client->connState = MQTT_CONNECTED; // v3.1.1 allows publishing while still connecting
|
|
}
|
|
|
|
/**
|
|
* @brief Allocate and enqueue mqtt message, kick sending, if appropriate
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len) {
|
|
PktBuf *buf = PktBuf_New(len);
|
|
os_memcpy(buf->data, data, len);
|
|
buf->filled = len;
|
|
client->msgQueue = PktBuf_Push(client->msgQueue, buf);
|
|
|
|
if (client->connState == MQTT_CONNECTED && !client->sending && client->pending_buffer == NULL) {
|
|
mqtt_send_message(client);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Send out top message in queue onto socket
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_send_message(MQTT_Client* client) {
|
|
//DBG_MQTT("MQTT: Send_message\n");
|
|
PktBuf *buf = client->msgQueue;
|
|
if (buf == NULL || client->sending) return; // ahem...
|
|
client->msgQueue = PktBuf_Shift(client->msgQueue);
|
|
|
|
// get some details about the message
|
|
uint16_t msg_type = mqtt_get_type(buf->data);
|
|
uint8_t msg_id = mqtt_get_id(buf->data, buf->filled);
|
|
#ifdef MQTT_DBG
|
|
os_printf("MQTT: Send type=%s id=%04X len=%d\n", mqtt_msg_type[msg_type], msg_id, buf->filled);
|
|
#if 0
|
|
for (int i=0; i<buf->filled; i++) {
|
|
if (buf->data[i] >= ' ' && buf->data[i] <= '~') os_printf("%c", buf->data[i]);
|
|
else os_printf("\\x%02X", buf->data[i]);
|
|
}
|
|
os_printf("\n");
|
|
#endif
|
|
#endif
|
|
|
|
// send the message out
|
|
if (client->security)
|
|
espconn_secure_sent(client->pCon, buf->data, buf->filled);
|
|
else
|
|
espconn_sent(client->pCon, buf->data, buf->filled);
|
|
client->sending = true;
|
|
|
|
// depending on whether it needs an ack we need to hold on to the message
|
|
bool needsAck =
|
|
(msg_type == MQTT_MSG_TYPE_PUBLISH && mqtt_get_qos(buf->data) > 0) ||
|
|
msg_type == MQTT_MSG_TYPE_PUBREL || msg_type == MQTT_MSG_TYPE_PUBREC ||
|
|
msg_type == MQTT_MSG_TYPE_SUBSCRIBE || msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE ||
|
|
msg_type == MQTT_MSG_TYPE_PINGREQ;
|
|
if (msg_type == MQTT_MSG_TYPE_PINGREQ) {
|
|
client->pending_buffer = NULL; // we don't need to rexmit this one
|
|
client->sending_buffer = buf;
|
|
} else if (needsAck) {
|
|
client->pending_buffer = buf; // remeber for rexmit on disconnect/reconnect
|
|
client->sending_buffer = NULL;
|
|
client->timeoutTick = client->sendTimeout+1; // +1 to ensure full sendTireout seconds
|
|
} else {
|
|
client->pending_buffer = NULL;
|
|
client->sending_buffer = buf;
|
|
client->timeoutTick = 0;
|
|
}
|
|
client->keepAliveTick = client->connect_info.keepalive > 0 ? client->connect_info.keepalive+1 : 0;
|
|
}
|
|
|
|
/**
|
|
* @brief DNS lookup for broker hostname completed, move to next phase
|
|
*/
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) {
|
|
struct espconn* pConn = (struct espconn *)arg;
|
|
MQTT_Client* client = (MQTT_Client *)pConn->reverse;
|
|
|
|
if (ipaddr == NULL) {
|
|
os_printf("MQTT: DNS lookup failed\n");
|
|
if (client != NULL) {
|
|
client->timeoutTick = client->reconTimeout;
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
|
|
client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
|
|
}
|
|
return;
|
|
}
|
|
DBG_MQTT("MQTT: ip %d.%d.%d.%d\n",
|
|
*((uint8 *)&ipaddr->addr),
|
|
*((uint8 *)&ipaddr->addr + 1),
|
|
*((uint8 *)&ipaddr->addr + 2),
|
|
*((uint8 *)&ipaddr->addr + 3));
|
|
|
|
if (client != NULL && client->ip.addr == 0 && ipaddr->addr != 0) {
|
|
os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
|
|
uint8_t err;
|
|
if (client->security)
|
|
err = espconn_secure_connect(client->pCon);
|
|
else
|
|
err = espconn_connect(client->pCon);
|
|
if (err != 0) {
|
|
os_printf("MQTT ERROR: Failed to connect\n");
|
|
client->timeoutTick = client->reconTimeout;
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
|
|
client->connState = TCP_RECONNECT_REQ;
|
|
} else {
|
|
DBG_MQTT("MQTT: connecting...\n");
|
|
}
|
|
}
|
|
}
|
|
|
|
//===== publish / subscribe
|
|
|
|
static void ICACHE_FLASH_ATTR
|
|
msg_conn_init(mqtt_connection_t *new_msg, mqtt_connection_t *old_msg,
|
|
uint8_t *buf, uint16_t buflen) {
|
|
new_msg->message_id = old_msg->message_id;
|
|
new_msg->buffer = buf;
|
|
new_msg->buffer_length = buflen;
|
|
}
|
|
|
|
/**
|
|
* @brief MQTT publish function.
|
|
* @param client: MQTT_Client reference
|
|
* @param topic: string topic will publish to
|
|
* @param data: buffer data send point to
|
|
* @param data_length: length of data
|
|
* @param qos: qos
|
|
* @param retain: retain
|
|
* @retval TRUE if success queue
|
|
*/
|
|
bool ICACHE_FLASH_ATTR
|
|
MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint16_t data_length,
|
|
uint8_t qos, uint8_t retain)
|
|
{
|
|
// estimate the packet size to allocate a buffer
|
|
uint16_t topic_length = os_strlen(topic);
|
|
// estimate: fixed hdr, pkt-id, topic length, topic, data, fudge
|
|
uint16_t buf_len = 3 + 2 + 2 + topic_length + data_length + 16;
|
|
PktBuf *buf = PktBuf_New(buf_len);
|
|
if (buf == NULL) {
|
|
os_printf("MQTT ERROR: Cannot allocate buffer for %d byte publish\n", buf_len);
|
|
return FALSE;
|
|
}
|
|
// use a temporary mqtt_message_t pointing to our buffer, this is a bit of a mess because we
|
|
// need to keep track of the message_id that is embedded in it
|
|
mqtt_connection_t msg;
|
|
msg_conn_init(&msg, &client->mqtt_connection, buf->data, buf_len);
|
|
uint16_t msg_id;
|
|
if (!mqtt_msg_publish(&msg, topic, data, data_length, qos, retain, &msg_id)){
|
|
os_printf("MQTT ERROR: Queuing Publish failed\n");
|
|
os_free(buf);
|
|
return FALSE;
|
|
}
|
|
client->mqtt_connection.message_id = msg.message_id;
|
|
if (msg.message.data != buf->data)
|
|
os_memcpy(buf->data, msg.message.data, msg.message.length);
|
|
buf->filled = msg.message.length;
|
|
|
|
DBG_MQTT("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length);
|
|
//dumpMem(buf, buf_len);
|
|
client->msgQueue = PktBuf_Push(client->msgQueue, buf);
|
|
|
|
if (!client->sending && client->pending_buffer == NULL) {
|
|
mqtt_send_message(client);
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
/**
|
|
* @brief MQTT subscribe function.
|
|
* @param client: MQTT_Client reference
|
|
* @param topic: string topic will subscribe
|
|
* @param qos: qos
|
|
* @retval TRUE if success queue
|
|
*/
|
|
bool ICACHE_FLASH_ATTR
|
|
MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) {
|
|
uint16_t msg_id;
|
|
if (!mqtt_msg_subscribe(&client->mqtt_connection, topic, 0, &msg_id)) {
|
|
os_printf("MQTT ERROR: Queuing Subscribe failed (too long)\n");
|
|
return FALSE;
|
|
}
|
|
DBG_MQTT("MQTT: Subscribe, topic: \"%s\"\n", topic);
|
|
mqtt_enq_message(client, client->mqtt_connection.message.data,
|
|
client->mqtt_connection.message.length);
|
|
return TRUE;
|
|
}
|
|
|
|
//===== Initialization and connect/disconnect
|
|
|
|
/**
|
|
* @brief MQTT initialization mqtt client function
|
|
* @param client: MQTT_Client reference
|
|
* @param host: Domain or IP string
|
|
* @param port: Port to connect
|
|
* @param security: 1 for ssl, 0 for none
|
|
* @param clientid: MQTT client id
|
|
* @param client_user: MQTT client user
|
|
* @param client_pass: MQTT client password
|
|
* @param keepAliveTime: MQTT keep alive timer, in second
|
|
* @param cleanSession: On connection, a client sets the "clean session" flag, which is sometimes also known as the "clean start" flag.
|
|
* If clean session is set to false, then the connection is treated as durable. This means that when the client
|
|
* disconnects, any subscriptions it has will remain and any subsequent QoS 1 or 2 messages will be stored until
|
|
* it connects again in the future. If clean session is true, then all subscriptions will be removed for the client
|
|
* when it disconnects.
|
|
* @retval None
|
|
*/
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_Init(MQTT_Client* client, char* host, uint32 port, uint8_t security, uint8_t sendTimeout,
|
|
char* client_id, char* client_user, char* client_pass,
|
|
uint8_t keepAliveTime) {
|
|
DBG_MQTT("MQTT_Init, host=%s\n", host);
|
|
|
|
os_memset(client, 0, sizeof(MQTT_Client));
|
|
|
|
client->host = (char*)os_zalloc(os_strlen(host) + 1);
|
|
os_strcpy(client->host, host);
|
|
|
|
client->port = port;
|
|
client->security = !!security;
|
|
|
|
// timeouts with sanity checks
|
|
client->sendTimeout = sendTimeout == 0 ? 1 : sendTimeout;
|
|
client->reconTimeout = 1; // reset reconnect back-off
|
|
|
|
os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
|
|
|
|
client->connect_info.client_id = (char*)os_zalloc(os_strlen(client_id) + 1);
|
|
os_strcpy(client->connect_info.client_id, client_id);
|
|
|
|
client->connect_info.username = (char*)os_zalloc(os_strlen(client_user) + 1);
|
|
os_strcpy(client->connect_info.username, client_user);
|
|
|
|
client->connect_info.password = (char*)os_zalloc(os_strlen(client_pass) + 1);
|
|
os_strcpy(client->connect_info.password, client_pass);
|
|
|
|
client->connect_info.keepalive = keepAliveTime;
|
|
client->connect_info.clean_session = 1;
|
|
|
|
client->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE);
|
|
client->in_buffer_size = MQTT_MAX_RCV_MESSAGE;
|
|
|
|
uint8_t *out_buffer = (uint8_t *)os_zalloc(MQTT_MAX_SHORT_MESSAGE);
|
|
mqtt_msg_init(&client->mqtt_connection, out_buffer, MQTT_MAX_SHORT_MESSAGE);
|
|
}
|
|
|
|
/**
|
|
* @brief MQTT Set Last Will Topic, must be called before MQTT_Connect
|
|
*/
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_InitLWT(MQTT_Client* client, char* will_topic, char* will_msg,
|
|
uint8_t will_qos, uint8_t will_retain) {
|
|
|
|
client->connect_info.will_topic = (char*)os_zalloc(os_strlen(will_topic) + 1);
|
|
os_strcpy((char*)client->connect_info.will_topic, will_topic);
|
|
|
|
client->connect_info.will_message = (char*)os_zalloc(os_strlen(will_msg) + 1);
|
|
os_strcpy((char*)client->connect_info.will_message, will_msg);
|
|
|
|
client->connect_info.will_qos = will_qos;
|
|
client->connect_info.will_retain = will_retain;
|
|
|
|
// TODO: if we're connected we should disconnect and reconnect to establish the new LWT
|
|
}
|
|
|
|
/**
|
|
* @brief Begin connect to MQTT broker
|
|
* @param client: MQTT_Client reference
|
|
* @retval None
|
|
*/
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_Connect(MQTT_Client* client) {
|
|
//MQTT_Disconnect(client);
|
|
client->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
|
|
client->pCon->type = ESPCONN_TCP;
|
|
client->pCon->state = ESPCONN_NONE;
|
|
client->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
|
|
client->pCon->proto.tcp->local_port = espconn_port();
|
|
client->pCon->proto.tcp->remote_port = client->port;
|
|
client->pCon->reverse = client;
|
|
espconn_regist_connectcb(client->pCon, mqtt_tcpclient_connect_cb);
|
|
espconn_regist_reconcb(client->pCon, mqtt_tcpclient_recon_cb);
|
|
|
|
// start timer function to tick every second
|
|
os_timer_disarm(&client->mqttTimer);
|
|
os_timer_setfn(&client->mqttTimer, (os_timer_func_t *)mqtt_timer, client);
|
|
os_timer_arm_us(&client->mqttTimer, 1 * 1000000, 1);
|
|
|
|
// initiate the TCP connection or DNS lookup
|
|
os_printf("MQTT: Connect to %s:%d %p (client=%p)\n",
|
|
client->host, client->port, client->pCon, client);
|
|
if (UTILS_StrToIP((const char *)client->host,
|
|
(void*)&client->pCon->proto.tcp->remote_ip)) {
|
|
uint8_t err;
|
|
if (client->security)
|
|
err = espconn_secure_connect(client->pCon);
|
|
else
|
|
err = espconn_connect(client->pCon);
|
|
if (err != 0) {
|
|
os_printf("MQTT ERROR: Failed to connect\n");
|
|
os_free(client->pCon->proto.tcp);
|
|
os_free(client->pCon);
|
|
client->pCon = NULL;
|
|
return;
|
|
}
|
|
} else {
|
|
espconn_gethostbyname(client->pCon, (const char *)client->host, &client->ip,
|
|
mqtt_dns_found);
|
|
}
|
|
|
|
client->connState = TCP_CONNECTING;
|
|
client->timeoutTick = 20; // generous timeout to allow for DNS, etc
|
|
client->sending = FALSE;
|
|
}
|
|
|
|
static void ICACHE_FLASH_ATTR
|
|
mqtt_doAbort(MQTT_Client* client) {
|
|
os_printf("MQTT: Disconnecting from %s:%d (%p)\n", client->host, client->port, client->pCon);
|
|
client->pCon->reverse = NULL; // ensure we jettison this pCon...
|
|
if (client->security)
|
|
espconn_secure_disconnect(client->pCon);
|
|
else
|
|
espconn_disconnect(client->pCon);
|
|
|
|
if (client->disconnectedCb) client->disconnectedCb(client);
|
|
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
|
|
|
|
if (client->sending_buffer != NULL) {
|
|
os_free(client->sending_buffer);
|
|
client->sending_buffer = NULL;
|
|
}
|
|
client->pCon = NULL; // it will be freed in disconnect callback
|
|
client->connState = TCP_RECONNECT_REQ;
|
|
client->timeoutTick = client->reconTimeout; // reconnect in a few seconds
|
|
if (client->reconTimeout < 128) client->reconTimeout <<= 1;
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_Reconnect(MQTT_Client* client) {
|
|
DBG_MQTT("MQTT: Reconnect requested\n");
|
|
if (client->connState == MQTT_DISCONNECTED)
|
|
MQTT_Connect(client);
|
|
else if (client->connState == MQTT_CONNECTED)
|
|
mqtt_doAbort(client);
|
|
// in other cases we're already in the reconnecting process
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_Disconnect(MQTT_Client* client) {
|
|
DBG_MQTT("MQTT: Disconnect requested\n");
|
|
os_timer_disarm(&client->mqttTimer);
|
|
if (client->connState == MQTT_DISCONNECTED) return;
|
|
if (client->connState == TCP_RECONNECT_REQ) {
|
|
client->connState = MQTT_DISCONNECTED;
|
|
return;
|
|
}
|
|
mqtt_doAbort(client);
|
|
//void *out_buffer = client->mqtt_connection.buffer;
|
|
//if (out_buffer != NULL) os_free(out_buffer);
|
|
client->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_Free(MQTT_Client* client) {
|
|
DBG_MQTT("MQTT: Free requested\n");
|
|
MQTT_Disconnect(client);
|
|
|
|
if (client->host) os_free(client->host);
|
|
client->host = NULL;
|
|
|
|
if (client->connect_info.client_id) os_free(client->connect_info.client_id);
|
|
if (client->connect_info.username) os_free(client->connect_info.username);
|
|
if (client->connect_info.password) os_free(client->connect_info.password);
|
|
os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
|
|
|
|
if (client->in_buffer) os_free(client->in_buffer);
|
|
client->in_buffer = NULL;
|
|
|
|
if (client->mqtt_connection.buffer) os_free(client->mqtt_connection.buffer);
|
|
os_memset(&client->mqtt_connection, 0, sizeof(client->mqtt_connection));
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_OnConnected(MQTT_Client* client, MqttCallback connectedCb) {
|
|
client->connectedCb = connectedCb;
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_OnDisconnected(MQTT_Client* client, MqttCallback disconnectedCb) {
|
|
client->disconnectedCb = disconnectedCb;
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_OnData(MQTT_Client* client, MqttDataCallback dataCb) {
|
|
client->dataCb = dataCb;
|
|
}
|
|
|
|
void ICACHE_FLASH_ATTR
|
|
MQTT_OnPublished(MQTT_Client* client, MqttCallback publishedCb) {
|
|
client->publishedCb = publishedCb;
|
|
}
|
|
|