fix mqtt for new slip protocol

pull/95/head
Thorsten von Eicken 9 years ago
parent 5a891c3c1e
commit 219b6c8006
  1. 17
      README.md
  2. 4
      cmd/cmd.c
  3. 14
      cmd/cmd.h
  4. 2
      cmd/handlers.c
  5. 23
      esp-link/mqtt_client.c
  6. 2
      esp-link/status.c
  7. 25
      mqtt/mqtt.c
  8. 12
      mqtt/mqtt.h
  9. 166
      mqtt/mqtt_cmd.c

@ -235,18 +235,29 @@ Troubleshooting
Building the firmware
---------------------
The firmware has been built using the [esp-open-sdk](https://github.com/pfalcon/esp-open-sdk)
on a Linux system. Create an esp8266 directory, install the esp-open-sdk into a sub-directory.
on a Linux system. Create an esp8266 directory, install the esp-open-sdk into a sub-directory
using the *non-standalone* install (i.e., there should not be an sdk directory in the esp-open-sdk
dir when done installing, if you use the standalone install you will get compilation errors
with std types, such as `uint32_t`).
Download the Espressif SDK (use the version mentioned in the release notes) from their
[download forum](http://bbs.espressif.com/viewforum.php?f=5) and also expand it into a
sub-directory. Then clone the esp-link repository into a third sub-directory.
sub-directory.
Clone the esp-link repository into a third sub-directory and check out the tag you would like,
such as `git checkout v2.1.7`.
This way the relative paths in the Makefile will work.
If you choose a different directory structure look at the Makefile for the appropriate environment
variables to define.
Do not use the source tarballs from the release page on github,
these will give you trouble compiling because the Makefile uses git to determine the esp-link
version being built.
In order to OTA-update the esp8266 you should `export ESP_HOSTNAME=...` with the hostname or
IP address of your module.
Now, build the code: `make` in the top-level of esp-link.
Now, build the code: `make` in the top-level of esp-link. If you want to se the commands being
issued, use `VERBOSE=1 make`.
A few notes from others (I can't fully verify these):
- You may need to install `zlib1g-dev` and `python-serial`

@ -34,7 +34,7 @@ cmdProtoWrite(uint8_t data) {
}
static void ICACHE_FLASH_ATTR
cmdProtoWriteBuf(uint8_t *data, short len) {
cmdProtoWriteBuf(const uint8_t *data, short len) {
while (len--) cmdProtoWrite(*data++);
}
@ -56,7 +56,7 @@ cmdResponseStart(uint16_t cmd, uint32_t value, uint16_t argc) {
// Adds data to a response, returns the partial CRC
void ICACHE_FLASH_ATTR
cmdResponseBody(void *data, uint16_t len) {
cmdResponseBody(const void *data, uint16_t len) {
cmdProtoWriteBuf((uint8_t*)&len, 2);
resp_crc = crc16_data((uint8_t*)&len, 2, resp_crc);

@ -40,18 +40,14 @@ typedef enum {
CMD_CB_EVENTS,
CMD_GET_TIME, // get current time in seconds since the unix epoch
CMD_MQTT_SETUP = 10,
CMD_MQTT_CONNECT,
CMD_MQTT_DISCONNECT,
CMD_MQTT_PUBLISH,
CMD_MQTT_SUBSCRIBE,
CMD_MQTT_LWT,
CMD_MQTT_EVENTS,
CMD_MQTT_SETUP = 10, // set-up callbacks
CMD_MQTT_PUBLISH, // publish a message
CMD_MQTT_SUBSCRIBE, // subscribe to a topic
CMD_MQTT_LWT, // set the last-will-topic and messge
CMD_REST_SETUP = 20,
CMD_REST_REQUEST,
CMD_REST_SETHEADER,
CMD_REST_EVENTS,
} CmdName;
typedef void (*cmdfunc_t)(CmdPacket *cmd);
@ -83,7 +79,7 @@ uint32_t cmdAddCb(char *name, uint32_t callback);
// Start a response
void cmdResponseStart(uint16_t cmd, uint32_t value, uint16_t argc);
// Adds data to a response
void cmdResponseBody(void* data, uint16_t len);
void cmdResponseBody(const void* data, uint16_t len);
// Ends a response
void cmdResponseEnd();

@ -35,8 +35,6 @@ const CmdList commands[] = {
{CMD_CB_ADD, "ADD_CB", cmdAddCallback},
#ifdef MQTT
{CMD_MQTT_SETUP, "MQTT_SETUP", MQTTCMD_Setup},
{CMD_MQTT_CONNECT, "MQTT_CONN", MQTTCMD_Connect},
{CMD_MQTT_DISCONNECT, "MQTT_DISCON", MQTTCMD_Disconnect},
{CMD_MQTT_PUBLISH, "MQTT_PUB", MQTTCMD_Publish},
{CMD_MQTT_SUBSCRIBE , "MQTT_SUB", MQTTCMD_Subscribe},
{CMD_MQTT_LWT, "MQTT_LWT", MQTTCMD_Lwt},

@ -18,34 +18,31 @@ static MqttCallback published_cb;
static MqttDataCallback data_cb;
void ICACHE_FLASH_ATTR
mqttConnectedCb(uint32_t *args) {
mqttConnectedCb(MQTT_Client* client) {
DBG("MQTT Client: Connected\n");
//MQTT_Client* client = (MQTT_Client*)args;
//MQTT_Subscribe(client, "system/time", 0); // handy for testing
if (connected_cb)
connected_cb(args);
connected_cb(client);
}
void ICACHE_FLASH_ATTR
mqttDisconnectedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
mqttDisconnectedCb(MQTT_Client* client) {
DBG("MQTT Client: Disconnected\n");
if (disconnected_cb)
disconnected_cb(args);
disconnected_cb(client);
}
void ICACHE_FLASH_ATTR
mqttPublishedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
mqttPublishedCb(MQTT_Client* client) {
DBG("MQTT Client: Published\n");
if (published_cb)
published_cb(args);
published_cb(client);
}
void ICACHE_FLASH_ATTR
mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len) {
// MQTT_Client* client = (MQTT_Client*)args;
mqttDataCb(MQTT_Client* client, const char* topic, uint32_t topic_len,
const char *data, uint32_t data_len)
{
#ifdef MQTTCLIENT_DBG
char *topicBuf = (char*)os_zalloc(topic_len + 1);
char *dataBuf = (char*)os_zalloc(data_len + 1);
@ -62,7 +59,7 @@ mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *da
#endif
if (data_cb)
data_cb(args, topic, topic_len, data, data_len);
data_cb(client, topic, topic_len, data, data_len);
}
void ICACHE_FLASH_ATTR

@ -37,7 +37,7 @@ static void ICACHE_FLASH_ATTR mqttStatusCb(void *v) {
char buf[128];
mqttStatusMsg(buf);
MQTT_Publish(&mqttClient, flashConfig.mqtt_status_topic, buf, 1, 0);
MQTT_Publish(&mqttClient, flashConfig.mqtt_status_topic, buf, os_strlen(buf), 1, 0);
}

@ -89,9 +89,9 @@ deliver_publish(MQTT_Client* client, uint8_t* message, uint16_t length) {
// callback to client
if (client->dataCb)
client->dataCb((uint32_t*)client, topic, topic_length, data, data_length);
client->dataCb(client, topic, topic_length, data, data_length);
if (client->cmdDataCb)
client->cmdDataCb((uint32_t*)client, topic, topic_length, data, data_length);
client->cmdDataCb(client, topic, topic_length, data, data_length);
}
/**
@ -164,8 +164,8 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) {
case MQTT_MSG_TYPE_CONNACK:
//DBG_MQTT("MQTT: Connect successful\n");
// callbacks for internal and external clients
if (client->connectedCb) client->connectedCb((uint32_t*)client);
if (client->cmdConnectedCb) client->cmdConnectedCb((uint32_t*)client);
if (client->connectedCb) client->connectedCb(client);
if (client->cmdConnectedCb) client->cmdConnectedCb(client);
client->reconTimeout = 1; // reset the reconnect backoff
break;
@ -357,8 +357,8 @@ mqtt_tcpclient_discon_cb(void* arg) {
// if this is an aborted connection we're done
if (client == NULL) return;
DBG_MQTT("MQTT: Disconnected from %s:%d\n", client->host, client->port);
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client);
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client);
if (client->disconnectedCb) client->disconnectedCb(client);
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
// reconnect unless we're in a permanently disconnected state
if (client->connState == MQTT_DISCONNECTED) return;
@ -380,8 +380,8 @@ mqtt_tcpclient_recon_cb(void* arg, int8_t err) {
if (pespconn->proto.tcp) os_free(pespconn->proto.tcp);
os_free(pespconn);
os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port);
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client);
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client);
if (client->disconnectedCb) client->disconnectedCb(client);
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
// reconnect unless we're in a permanently disconnected state
if (client->connState == MQTT_DISCONNECTED) return;
@ -547,10 +547,11 @@ msg_conn_init(mqtt_connection_t *new_msg, mqtt_connection_t *old_msg,
* @retval TRUE if success queue
*/
bool ICACHE_FLASH_ATTR
MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain) {
MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint16_t data_length,
uint8_t qos, uint8_t retain)
{
// estimate the packet size to allocate a buffer
uint16_t topic_length = os_strlen(topic);
uint16_t data_length = os_strlen(data);
// estimate: fixed hdr, pkt-id, topic length, topic, data, fudge
uint16_t buf_len = 3 + 2 + 2 + topic_length + data_length + 16;
PktBuf *buf = PktBuf_New(buf_len);
@ -738,8 +739,8 @@ mqtt_doAbort(MQTT_Client* client) {
else
espconn_disconnect(client->pCon);
if (client->disconnectedCb) client->disconnectedCb((uint32_t*)client);
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb((uint32_t*)client);
if (client->disconnectedCb) client->disconnectedCb(client);
if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
if (client->sending_buffer != NULL) {
os_free(client->sending_buffer);

@ -44,14 +44,16 @@ typedef enum {
MQTT_CONNECTED, // conneted (or connecting)
} tConnState;
typedef struct MQTT_Client MQTT_Client; // forward definition
// Simple notification callback
typedef void (*MqttCallback)(uint32_t* args);
typedef void (*MqttCallback)(MQTT_Client *client);
// Callback with data messge
typedef void (*MqttDataCallback)(uint32_t* args, const char* topic, uint32_t topic_len,
typedef void (*MqttDataCallback)(MQTT_Client *client, const char* topic, uint32_t topic_len,
const char* data, uint32_t data_len);
// MQTTY client data structure
typedef struct {
struct MQTT_Client {
struct espconn* pCon; // socket
// connection information
char* host; // MQTT server
@ -89,7 +91,7 @@ typedef struct {
MqttDataCallback cmdDataCb;
// misc
void* user_data;
} MQTT_Client;
};
// Initialize client data structure
void MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port,
@ -119,7 +121,7 @@ void MQTT_Disconnect(MQTT_Client* mqttClient);
bool MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos);
// Publish a message
bool MQTT_Publish(MQTT_Client* client, const char* topic, const char* data,
bool MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint16_t data_len,
uint8_t qos, uint8_t retain);
// Callback when connected

@ -13,74 +13,51 @@
#define DBG(format, ...) do { } while(0)
#endif
// if MQTT_1_CLIENT is defined we only support the one client that is built into esp-link.
// this keeps everything simpler. Undefining it brings back old code that supports creating
// a new client and setting all its params. Most likely that old code no longer works...
#define MQTT_1_CLIENT
// callbacks to the attached uC
uint32_t connectedCb = 0, disconnectCb = 0, publishedCb = 0, dataCb = 0;
void ICACHE_FLASH_ATTR
cmdMqttConnectedCb(uint32_t* args) {
MQTT_Client* client = (MQTT_Client*)args;
cmdMqttConnectedCb(MQTT_Client* client) {
MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
DBG("MQTT: Connected connectedCb=%p, disconnectedCb=%p, publishedCb=%p, dataCb=%p\n",
(void*)cb->connectedCb,
(void*)cb->disconnectedCb,
(void*)cb->publishedCb,
(void*)cb->dataCb);
uint16_t crc = cmdResponseStart(CMD_MQTT_EVENTS, cb->connectedCb, 0, 0);
cmdResponseEnd(crc);
DBG("MQTT: Connected Cb=%p\n", (void*)cb->connectedCb);
cmdResponseStart(CMD_RESP_CB, cb->connectedCb, 0);
cmdResponseEnd();
}
void ICACHE_FLASH_ATTR
cmdMqttDisconnectedCb(uint32_t* args) {
MQTT_Client* client = (MQTT_Client*)args;
cmdMqttDisconnectedCb(MQTT_Client* client) {
MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
DBG("MQTT: Disconnected\n");
uint16_t crc = cmdResponseStart(CMD_MQTT_EVENTS, cb->disconnectedCb, 0, 0);
cmdResponseEnd(crc);
DBG("MQTT: Disconnected cb=%p\n", (void*)cb->disconnectedCb);
cmdResponseStart(CMD_RESP_CB, cb->disconnectedCb, 0);
cmdResponseEnd();
}
void ICACHE_FLASH_ATTR
cmdMqttPublishedCb(uint32_t* args) {
MQTT_Client* client = (MQTT_Client*)args;
cmdMqttPublishedCb(MQTT_Client* client) {
MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
DBG("MQTT: Published\n");
uint16_t crc = cmdResponseStart(CMD_MQTT_EVENTS, cb->publishedCb, 0, 0);
cmdResponseEnd(crc);
DBG("MQTT: Published cb=%p\n", (void*)cb->publishedCb);
cmdResponseStart(CMD_RESP_CB, cb->publishedCb, 0);
cmdResponseEnd();
}
void ICACHE_FLASH_ATTR
cmdMqttDataCb(uint32_t* args, const char* topic, uint32_t topic_len, const char* data, uint32_t data_len) {
uint16_t crc = 0;
MQTT_Client* client = (MQTT_Client*)args;
cmdMqttDataCb(MQTT_Client* client, const char* topic, uint32_t topic_len,
const char* data, uint32_t data_len)
{
MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
DBG("MQTT: Data cb=%p topic=%s len=%ld\n", (void*)cb->dataCb, topic, data_len);
crc = cmdResponseStart(CMD_MQTT_EVENTS, cb->dataCb, 0, 2);
crc = cmdResponseBody(crc, (uint8_t*)topic, topic_len);
crc = cmdResponseBody(crc, (uint8_t*)data, data_len);
cmdResponseEnd(crc);
cmdResponseStart(CMD_RESP_CB, cb->dataCb, 2);
cmdResponseBody(topic, topic_len);
cmdResponseBody(data, data_len);
cmdResponseEnd();
}
uint32_t ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR
MQTTCMD_Lwt(CmdPacket *cmd) {
CmdRequest req;
cmdRequest(&req, cmd);
if (cmdGetArgc(&req) != 5)
return 0;
if (cmdGetArgc(&req) != 4) return;
// get mqtt client
uint32_t client_ptr;
cmdPopArg(&req, (uint8_t*)&client_ptr, 4);
#ifdef MQTT_1_CLIENT
MQTT_Client* client = &mqttClient;
#else
MQTT_Client* client = (MQTT_Client*)client_ptr;
DBG("MQTT: MQTTCMD_Lwt client ptr=%p\n", (void*)client_ptr);
#endif
// free old topic & message
if (client->connect_info.will_topic)
@ -92,14 +69,14 @@ MQTTCMD_Lwt(CmdPacket *cmd) {
// get topic
len = cmdArgLen(&req);
if (len > 128) return 0; // safety check
if (len > 128) return; // safety check
client->connect_info.will_topic = (char*)os_zalloc(len + 1);
cmdPopArg(&req, client->connect_info.will_topic, len);
client->connect_info.will_topic[len] = 0;
// get message
len = cmdArgLen(&req);
if (len > 128) return 0; // safety check
if (len > 128) return; // safety check
client->connect_info.will_message = (char*)os_zalloc(len + 1);
cmdPopArg(&req, client->connect_info.will_message, len);
client->connect_info.will_message[len] = 0;
@ -118,32 +95,22 @@ MQTTCMD_Lwt(CmdPacket *cmd) {
// trigger a reconnect to set the LWT
MQTT_Reconnect(client);
return 1;
}
uint32_t ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR
MQTTCMD_Publish(CmdPacket *cmd) {
CmdRequest req;
cmdRequest(&req, cmd);
if (cmdGetArgc(&req) != 6)
return 0;
if (cmdGetArgc(&req) != 5) return;
// get mqtt client
uint32_t client_ptr;
cmdPopArg(&req, (uint8_t*)&client_ptr, 4);
#ifdef MQTT_1_CLIENT
MQTT_Client* client = &mqttClient;
#else
MQTT_Client* client = (MQTT_Client*)client_ptr;
DBG("MQTT: MQTTCMD_Publish client ptr=%p\n", (void*)client_ptr);
#endif
uint16_t len;
// get topic
len = cmdArgLen(&req);
if (len > 128) return 0; // safety check
if (len > 128) return; // safety check
uint8_t *topic = (uint8_t*)os_zalloc(len + 1);
cmdPopArg(&req, topic, len);
topic[len] = 0;
@ -153,58 +120,46 @@ MQTTCMD_Publish(CmdPacket *cmd) {
uint8_t *data = (uint8_t*)os_zalloc(len+1);
if (!data) { // safety check
os_free(topic);
return 0;
return;
}
cmdPopArg(&req, data, len);
data[len] = 0;
uint32_t qos, retain, data_len;
uint16_t data_len;
uint8_t qos, retain;
// get data length
// this isn't used but we have to pull it off the stack
cmdPopArg(&req, (uint8_t*)&data_len, 4);
cmdPopArg(&req, &data_len, sizeof(data_len));
// get qos
cmdPopArg(&req, (uint8_t*)&qos, 4);
cmdPopArg(&req, &qos, sizeof(qos));
// get retain
cmdPopArg(&req, (uint8_t*)&retain, 4);
cmdPopArg(&req, &retain, sizeof(retain));
DBG("MQTT: MQTTCMD_Publish topic=%s, data_len=%d, qos=%ld, retain=%ld\n",
topic,
os_strlen((char*)data),
qos,
retain);
DBG("MQTT: MQTTCMD_Publish topic=%s, data_len=%d, qos=%d, retain=%d\n",
topic, data_len, qos, retain);
MQTT_Publish(client, (char*)topic, (char*)data, (uint8_t)qos, (uint8_t)retain);
MQTT_Publish(client, (char*)topic, (char*)data, data_len, qos%3, retain&1);
os_free(topic);
os_free(data);
return 1;
return;
}
uint32_t ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR
MQTTCMD_Subscribe(CmdPacket *cmd) {
CmdRequest req;
cmdRequest(&req, cmd);
if (cmdGetArgc(&req) != 3)
return 0;
if (cmdGetArgc(&req) != 2) return;
// get mqtt client
uint32_t client_ptr;
cmdPopArg(&req, (uint8_t*)&client_ptr, 4);
#ifdef MQTT_1_CLIENT
MQTT_Client* client = &mqttClient;
#else
MQTT_Client* client = (MQTT_Client*)client_ptr;
DBG("MQTT: MQTTCMD_Subscribe client ptr=%p\n", (void*)client_ptr);
#endif
uint16_t len;
// get topic
len = cmdArgLen(&req);
if (len > 128) return 0; // safety check
if (len > 128) return; // safety check
uint8_t* topic = (uint8_t*)os_zalloc(len + 1);
cmdPopArg(&req, topic, len);
topic[len] = 0;
@ -217,22 +172,19 @@ MQTTCMD_Subscribe(CmdPacket *cmd) {
MQTT_Subscribe(client, (char*)topic, (uint8_t)qos);
os_free(topic);
return 1;
return;
}
uint32_t ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR
MQTTCMD_Setup(CmdPacket *cmd) {
CmdRequest req;
cmdRequest(&req, cmd);
#ifdef MQTT_1_CLIENT
MQTT_Client* client = &mqttClient;
cmdSkipArg(&req);
cmdSkipArg(&req);
cmdSkipArg(&req);
cmdSkipArg(&req);
cmdSkipArg(&req);
#else
if (cmdGetArgc(&req) != 4) return;
#if 0
if (cmdGetArgc(&req) != 9)
return 0;
@ -287,27 +239,28 @@ MQTTCMD_Setup(CmdPacket *cmd) {
// create callback
MqttCmdCb* callback = (MqttCmdCb*)os_zalloc(sizeof(MqttCmdCb));
uint32_t cb_data;
cmdPopArg(&req, (uint8_t*)&cb_data, 4);
callback->connectedCb = cb_data;
cmdPopArg(&req, (uint8_t*)&cb_data, 4);
callback->disconnectedCb = cb_data;
cmdPopArg(&req, (uint8_t*)&cb_data, 4);
callback->publishedCb = cb_data;
cmdPopArg(&req, (uint8_t*)&cb_data, 4);
callback->dataCb = cb_data;
cmdPopArg(&req, &callback->connectedCb, 4);
cmdPopArg(&req, &callback->disconnectedCb, 4);
cmdPopArg(&req, &callback->publishedCb, 4);
cmdPopArg(&req, &callback->dataCb, 4);
client->user_data = callback;
DBG("MQTT connectedCb=%lx\n", callback->connectedCb);
client->cmdConnectedCb = cmdMqttConnectedCb;
client->cmdDisconnectedCb = cmdMqttDisconnectedCb;
client->cmdPublishedCb = cmdMqttPublishedCb;
client->cmdDataCb = cmdMqttDataCb;
return 0xf00df00d; //(uint32_t)client;
if (client->connState == MQTT_CONNECTED) {
if (callback->connectedCb)
cmdMqttConnectedCb(client);
} else if (callback->disconnectedCb) {
cmdMqttDisconnectedCb(client);
}
}
#if 0
uint32_t ICACHE_FLASH_ATTR
MQTTCMD_Connect(CmdPacket *cmd) {
CmdRequest req;
@ -383,3 +336,4 @@ MQTTCMD_Disconnect(CmdPacket *cmd) {
return 1;
#endif
}
#endif

Loading…
Cancel
Save