From 248f2f3f47b6aa4ce46668f040805d868a49ca07 Mon Sep 17 00:00:00 2001 From: Benjamin Runnels Date: Sat, 29 Aug 2015 16:13:52 -0500 Subject: [PATCH] mergeing mqtt into master --- Makefile | 2 +- cmd/handlers.c | 7 +- cmd/mqtt_cmd.c | 336 ++++++++++++++++++ cmd/mqtt_cmd.h | 22 ++ cmd/rest.c | 2 - cmd/rest.h | 1 + esp-link.vcxproj | 31 +- {user => esp-link}/cgi.c | 0 {user => esp-link}/cgi.h | 0 {user => esp-link}/cgiflash.c | 0 {user => esp-link}/cgiflash.h | 0 {user => esp-link}/cgipins.c | 0 {user => esp-link}/cgipins.h | 0 {user => esp-link}/cgitcp.c | 0 {user => esp-link}/cgitcp.h | 0 {user => esp-link}/cgiwifi.c | 0 {user => esp-link}/cgiwifi.h | 4 +- {user => esp-link}/config.c | 0 {user => esp-link}/config.h | 0 {user => esp-link}/log.c | 0 {user => esp-link}/log.h | 0 esp-link/main.c | 168 +++++++++ {user => esp-link}/status.c | 0 {user => esp-link}/status.h | 0 httpd/httpdespfs.c | 11 +- httpd/httpdespfs.h | 6 +- include/esp8266.h | 5 +- include/user_config.h | 19 + mqtt/mqtt.c | 635 ++++++++++++++++++++++++++++++++++ mqtt/mqtt.h | 144 ++++++++ mqtt/mqtt_msg.c | 451 ++++++++++++++++++++++++ mqtt/mqtt_msg.h | 117 +++++++ mqtt/proto.c | 86 +++++ mqtt/proto.h | 21 ++ mqtt/queue.c | 53 +++ mqtt/queue.h | 46 +++ mqtt/ringbuf.c | 63 ++++ mqtt/ringbuf.h | 17 + user/user_main.c | 203 +++-------- 39 files changed, 2286 insertions(+), 164 deletions(-) create mode 100644 cmd/mqtt_cmd.c create mode 100644 cmd/mqtt_cmd.h rename {user => esp-link}/cgi.c (100%) rename {user => esp-link}/cgi.h (100%) rename {user => esp-link}/cgiflash.c (100%) rename {user => esp-link}/cgiflash.h (100%) rename {user => esp-link}/cgipins.c (100%) rename {user => esp-link}/cgipins.h (100%) rename {user => esp-link}/cgitcp.c (100%) rename {user => esp-link}/cgitcp.h (100%) rename {user => esp-link}/cgiwifi.c (100%) rename {user => esp-link}/cgiwifi.h (72%) rename {user => esp-link}/config.c (100%) rename {user => esp-link}/config.h (100%) rename {user => esp-link}/log.c (100%) rename {user => esp-link}/log.h (100%) create mode 100644 esp-link/main.c rename {user => esp-link}/status.c (100%) rename {user => esp-link}/status.h (100%) create mode 100644 mqtt/mqtt.c create mode 100644 mqtt/mqtt.h create mode 100644 mqtt/mqtt_msg.c create mode 100644 mqtt/mqtt_msg.h create mode 100644 mqtt/proto.c create mode 100644 mqtt/proto.h create mode 100644 mqtt/queue.c create mode 100644 mqtt/queue.h create mode 100644 mqtt/ringbuf.c create mode 100644 mqtt/ringbuf.h diff --git a/Makefile b/Makefile index 66bcbe9..451d7f3 100644 --- a/Makefile +++ b/Makefile @@ -143,7 +143,7 @@ TARGET = httpd APPGEN_TOOL ?= gen_appbin.py # which modules (subdirectories) of the project to include in compiling -MODULES = espfs httpd user serial cmd +MODULES = espfs httpd user serial cmd mqtt esp-link EXTRA_INCDIR = include . # libraries used in this project, mainly provided by the SDK diff --git a/cmd/handlers.c b/cmd/handlers.c index 8b37a18..b54773f 100644 --- a/cmd/handlers.c +++ b/cmd/handlers.c @@ -9,6 +9,7 @@ #include "serbridge.h" #include "uart.h" #include "cgiwifi.h" +#include "mqtt_cmd.h" static uint32_t CMD_Null(CmdPacket *cmd); static uint32_t CMD_IsReady(CmdPacket *cmd); @@ -18,6 +19,7 @@ static uint32_t CMD_AddCallback(CmdPacket *cmd); // keep track of last status sent to uC so we can notify it when it changes static uint8_t lastWifiStatus = wifiIsDisconnected; +static bool wifiCbAdded = false; // Command dispatch table for serial -> ESP commands const CmdList commands[] = { @@ -131,7 +133,10 @@ CMD_WifiConnect(CmdPacket *cmd) { if(cmd->argc != 2 || cmd->callback == 0) return 0; - wifiStatusCb = CMD_WifiCb; // register our callback with wifi subsystem + if (!wifiCbAdded) { + wifiAddStateChangeCb(CMD_WifiCb); // register our callback with wifi subsystem + wifiCbAdded = true; + } CMD_AddCb("wifiCb", (uint32_t)cmd->callback); // save the MCU's callback lastWifiStatus = 0xff; // set to invalid value so we immediately send status cb in all cases CMD_WifiCb(wifiState); diff --git a/cmd/mqtt_cmd.c b/cmd/mqtt_cmd.c new file mode 100644 index 0000000..358ad48 --- /dev/null +++ b/cmd/mqtt_cmd.c @@ -0,0 +1,336 @@ +#include "mqtt_cmd.h" + +uint32_t connectedCb = 0, disconnectCb = 0, tcpDisconnectedCb = 0, publishedCb = 0, dataCb = 0; + +void ICACHE_FLASH_ATTR +cmdMqttConnectedCb(uint32_t* args) { + MQTT_Client* client = (MQTT_Client*)args; + MqttCmdCb* cb = (MqttCmdCb*)client->user_data; + os_printf("MQTT: Connected connectedCb=%p, disconnectedCb=%p, publishedCb=%p, dataCb=%p\n", + (void*)cb->connectedCb, + (void*)cb->disconnectedCb, + (void*)cb->publishedCb, + (void*)cb->dataCb); + uint16_t crc = CMD_ResponseStart(CMD_MQTT_EVENTS, cb->connectedCb, 0, 0); + CMD_ResponseEnd(crc); +} + +void ICACHE_FLASH_ATTR +cmdMqttTcpDisconnectedCb(uint32_t *args) { + MQTT_Client* client = (MQTT_Client*)args; + MqttCmdCb *cb = (MqttCmdCb*)client->user_data; + os_printf("MQTT: TCP Disconnected\n"); + uint16_t crc = CMD_ResponseStart(CMD_MQTT_EVENTS, cb->tcpDisconnectedCb, 0, 0); + CMD_ResponseEnd(crc); +} + +void ICACHE_FLASH_ATTR +cmdMqttDisconnectedCb(uint32_t* args) { + MQTT_Client* client = (MQTT_Client*)args; + MqttCmdCb* cb = (MqttCmdCb*)client->user_data; + os_printf("MQTT: Disconnected\n"); + uint16_t crc = CMD_ResponseStart(CMD_MQTT_EVENTS, cb->disconnectedCb, 0, 0); + CMD_ResponseEnd(crc); +} + +void ICACHE_FLASH_ATTR +cmdMqttPublishedCb(uint32_t* args) { + MQTT_Client* client = (MQTT_Client*)args; + MqttCmdCb* cb = (MqttCmdCb*)client->user_data; + os_printf("MQTT: Published\n"); + uint16_t crc = CMD_ResponseStart(CMD_MQTT_EVENTS, cb->publishedCb, 0, 0); + CMD_ResponseEnd(crc); +} + +void ICACHE_FLASH_ATTR +cmdMqttDataCb(uint32_t* args, const char* topic, uint32_t topic_len, const char* data, uint32_t data_len) { + uint16_t crc = 0; + MQTT_Client* client = (MQTT_Client*)args; + MqttCmdCb* cb = (MqttCmdCb*)client->user_data; + + crc = CMD_ResponseStart(CMD_MQTT_EVENTS, cb->dataCb, 0, 2); + crc = CMD_ResponseBody(crc, (uint8_t*)topic, topic_len); + crc = CMD_ResponseBody(crc, (uint8_t*)data, data_len); + CMD_ResponseEnd(crc); +} + +uint32_t ICACHE_FLASH_ATTR +MQTTCMD_Setup(CmdPacket *cmd) { + CmdRequest req; + CMD_Request(&req, cmd); + + if (CMD_GetArgc(&req) != 9) + return 0; + + // create mqtt client + uint8_t clientLen = sizeof(MQTT_Client); + MQTT_Client* client = (MQTT_Client*)os_zalloc(clientLen); + if (client == NULL) + return 0; + os_memset(client, 0, clientLen); + + uint16_t len; + uint8_t *client_id, *user_data, *pass_data; + uint32_t keepalive, clean_session, cb_data; + + // get client id + len = CMD_ArgLen(&req); + if (len > 32) return 0; // safety check + client_id = (uint8_t*)os_zalloc(len + 1); + CMD_PopArg(&req, client_id, len); + client_id[len] = 0; + + // get username + len = CMD_ArgLen(&req); + if (len > 32) return 0; // safety check + user_data = (uint8_t*)os_zalloc(len + 1); + CMD_PopArg(&req, user_data, len); + user_data[len] = 0; + + // get password + len = CMD_ArgLen(&req); + if (len > 32) return 0; // safety check + pass_data = (uint8_t*)os_zalloc(len + 1); + CMD_PopArg(&req, pass_data, len); + pass_data[len] = 0; + + // get keepalive + CMD_PopArg(&req, (uint8_t*)&keepalive, 4); + + // get clean session + CMD_PopArg(&req, (uint8_t*)&clean_session, 4); + + os_printf("MQTT: MQTTCMD_Setup clientid=%s, user=%s, pw=%s, keepalive=%ld, clean_session=%ld\n", client_id, user_data, pass_data, keepalive, clean_session); + + // init client + // TODO: why malloc these all here, pass to MQTT_InitClient to be malloc'd again? + MQTT_InitClient(client, (char*)client_id, (char*)user_data, (char*)pass_data, keepalive, clean_session); + + // create callback + MqttCmdCb* callback = (MqttCmdCb*)os_zalloc(sizeof(MqttCmdCb)); + + CMD_PopArg(&req, (uint8_t*)&cb_data, 4); + callback->connectedCb = cb_data; + CMD_PopArg(&req, (uint8_t*)&cb_data, 4); + callback->disconnectedCb = cb_data; + CMD_PopArg(&req, (uint8_t*)&cb_data, 4); + callback->publishedCb = cb_data; + CMD_PopArg(&req, (uint8_t*)&cb_data, 4); + callback->dataCb = cb_data; + + client->user_data = callback; + + client->cmdConnectedCb = cmdMqttConnectedCb; + client->cmdDisconnectedCb = cmdMqttDisconnectedCb; + client->cmdPublishedCb = cmdMqttPublishedCb; + client->cmdDataCb = cmdMqttDataCb; + + if (CMD_GetArgc(&req) == 10) { + CMD_PopArg(&req, (uint8_t*)&cb_data, 4); + callback->tcpDisconnectedCb = cb_data; + client->cmdTcpDisconnectedCb = cmdMqttTcpDisconnectedCb; + } + + os_free(client_id); + os_free(user_data); + os_free(pass_data); + + return (uint32_t)client; +} + +uint32_t ICACHE_FLASH_ATTR +MQTTCMD_Lwt(CmdPacket *cmd) { + CmdRequest req; + CMD_Request(&req, cmd); + + if (CMD_GetArgc(&req) != 5) + return 0; + + // get mqtt client + uint32_t client_ptr; + CMD_PopArg(&req, (uint8_t*)&client_ptr, 4); + MQTT_Client* client = (MQTT_Client*)client_ptr; + os_printf("MQTT: MQTTCMD_Lwt client ptr=%p\n", (void*)client_ptr); + + uint16_t len; + + // get topic + if (client->connect_info.will_topic) + os_free(client->connect_info.will_topic); + len = CMD_ArgLen(&req); + if (len > 128) return 0; // safety check + client->connect_info.will_topic = (char*)os_zalloc(len + 1); + CMD_PopArg(&req, client->connect_info.will_topic, len); + client->connect_info.will_topic[len] = 0; + + // get message + if (client->connect_info.will_message) + os_free(client->connect_info.will_message); + len = CMD_ArgLen(&req); + // TODO: safety check + client->connect_info.will_message = (char*)os_zalloc(len + 1); + CMD_PopArg(&req, client->connect_info.will_message, len); + client->connect_info.will_message[len] = 0; + + // get qos + CMD_PopArg(&req, (uint8_t*)&client->connect_info.will_qos, 4); + + // get retain + CMD_PopArg(&req, (uint8_t*)&client->connect_info.will_retain, 4); + + os_printf("MQTT: MQTTCMD_Lwt topic=%s, message=%s, qos=%d, retain=%d\n", + client->connect_info.will_topic, + client->connect_info.will_message, + client->connect_info.will_qos, + client->connect_info.will_retain); + return 1; +} + +uint32_t ICACHE_FLASH_ATTR +MQTTCMD_Connect(CmdPacket *cmd) { + CmdRequest req; + CMD_Request(&req, cmd); + + if (CMD_GetArgc(&req) != 4) + return 0; + + // get mqtt client + uint32_t client_ptr; + CMD_PopArg(&req, (uint8_t*)&client_ptr, 4); + MQTT_Client* client = (MQTT_Client*)client_ptr; + os_printf("MQTT: MQTTCMD_Connect client ptr=%p\n", (void*)client_ptr); + + uint16_t len; + + // get host + if (client->host) + os_free(client->host); + len = CMD_ArgLen(&req); + if (len > 128) return 0; // safety check + client->host = (char*)os_zalloc(len + 1); + CMD_PopArg(&req, client->host, len); + client->host[len] = 0; + + // get port + CMD_PopArg(&req, (uint8_t*)&client->port, 4); + + // get security + CMD_PopArg(&req, (uint8_t*)&client->security, 4); + + os_printf("MQTT: MQTTCMD_Connect host=%s, port=%ld, security=%d\n", + client->host, + client->port, + client->security); + + MQTT_Connect(client); + return 1; +} + +uint32_t ICACHE_FLASH_ATTR +MQTTCMD_Disconnect(CmdPacket *cmd) { + CmdRequest req; + CMD_Request(&req, cmd); + + if (CMD_GetArgc(&req) != 1) + return 0; + + // get mqtt client + uint32_t client_ptr; + CMD_PopArg(&req, (uint8_t*)&client_ptr, 4); + MQTT_Client* client = (MQTT_Client*)client_ptr; + os_printf("MQTT: MQTTCMD_Disconnect client ptr=%p\n", (void*)client_ptr); + + // disconnect + MQTT_Disconnect(client); + return 1; +} + +uint32_t ICACHE_FLASH_ATTR +MQTTCMD_Publish(CmdPacket *cmd) { + CmdRequest req; + CMD_Request(&req, cmd); + + if (CMD_GetArgc(&req) != 6) + return 0; + + // get mqtt client + uint32_t client_ptr; + CMD_PopArg(&req, (uint8_t*)&client_ptr, 4); + MQTT_Client* client = (MQTT_Client*)client_ptr; + os_printf("MQTT: MQTTCMD_Publish client ptr=%p\n", (void*)client_ptr); + + uint16_t len; + uint8_t *topic, *data; + uint32_t qos = 0, retain = 0, data_len; + + // get topic + len = CMD_ArgLen(&req); + if (len > 128) return 0; // safety check + topic = (uint8_t*)os_zalloc(len + 1); + CMD_PopArg(&req, topic, len); + topic[len] = 0; + + // get data + len = CMD_ArgLen(&req); + // TODO: Safety check + // TODO: this was orignially zalloc len not len+1 + data = (uint8_t*)os_zalloc(len+1); + CMD_PopArg(&req, data, len); + // TODO: next line not originally present + data[len] = 0; + + // get data length + // TODO: this isn't used but we have to pull it off the stack + CMD_PopArg(&req, (uint8_t*)&data_len, 4); + + // get qos + CMD_PopArg(&req, (uint8_t*)&qos, 4); + + // get retain + CMD_PopArg(&req, (uint8_t*)&retain, 4); + + os_printf("MQTT: MQTTCMD_Publish topic=%s, data_len=%d, qos=%ld, retain=%ld\n", + topic, + os_strlen((char*)data), + qos, + retain); + + MQTT_Publish(client, (char*)topic, (char*)data, (uint8_t)qos, (uint8_t)retain); + os_free(topic); + os_free(data); + return 1; +} + +uint32_t ICACHE_FLASH_ATTR +MQTTCMD_Subscribe(CmdPacket *cmd) { + CmdRequest req; + CMD_Request(&req, cmd); + + if (CMD_GetArgc(&req) != 3) + return 0; + + // get mqtt client + uint32_t client_ptr; + CMD_PopArg(&req, (uint8_t*)&client_ptr, 4); + MQTT_Client* client = (MQTT_Client*)client_ptr; + os_printf("MQTT: MQTTCMD_Subscribe client ptr=%p\n", (void*)client_ptr); + + uint16_t len; + uint8_t* topic; + uint32_t qos = 0; + + // get topic + len = CMD_ArgLen(&req); + if (len > 128) return 0; // safety check + topic = (uint8_t*)os_zalloc(len + 1); + CMD_PopArg(&req, topic, len); + topic[len] = 0; + + // get qos + CMD_PopArg(&req, (uint8_t*)&qos, 4); + + os_printf("MQTT: MQTTCMD_Subscribe topic=%s, qos=%ld\n", topic, qos); + MQTT_Subscribe(client, (char*)topic, (uint8_t)qos); + os_free(topic); + return 1; +} diff --git a/cmd/mqtt_cmd.h b/cmd/mqtt_cmd.h new file mode 100644 index 0000000..9ccbd08 --- /dev/null +++ b/cmd/mqtt_cmd.h @@ -0,0 +1,22 @@ +#ifndef MODULES_MQTT_CMD_H_ +#define MODULES_MQTT_CMD_H_ + +#include "cmd.h" +#include "mqtt.h" + +typedef struct { + uint32_t connectedCb; + uint32_t disconnectedCb; + uint32_t publishedCb; + uint32_t dataCb; + uint32_t tcpDisconnectedCb; +} MqttCmdCb; + +uint32_t MQTTCMD_Connect(CmdPacket *cmd); +uint32_t MQTTCMD_Disconnect(CmdPacket *cmd); +uint32_t MQTTCMD_Setup(CmdPacket *cmd); +uint32_t MQTTCMD_Publish(CmdPacket *cmd); +uint32_t MQTTCMD_Subscribe(CmdPacket *cmd); +uint32_t MQTTCMD_Lwt(CmdPacket *cmd); + +#endif /* MODULES_MQTT_CMD_H_ */ diff --git a/cmd/rest.c b/cmd/rest.c index 5bc3da7..3556681 100644 --- a/cmd/rest.c +++ b/cmd/rest.c @@ -16,8 +16,6 @@ static RestClient restClient[MAX_REST]; static uint8_t restNum = 0xff; // index into restClient for next slot to allocate #define REST_CB 0xbeef0000 // fudge added to callback for arduino so we can detect problems -extern uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const char* str, void *ip); - static void ICACHE_FLASH_ATTR tcpclient_discon_cb(void *arg) { struct espconn *pespconn = (struct espconn *)arg; diff --git a/cmd/rest.h b/cmd/rest.h index 7161e1f..9c6cd5c 100644 --- a/cmd/rest.h +++ b/cmd/rest.h @@ -36,5 +36,6 @@ typedef struct { uint32_t REST_Setup(CmdPacket *cmd); uint32_t REST_Request(CmdPacket *cmd); uint32_t REST_SetHeader(CmdPacket *cmd); +uint8_t UTILS_StrToIP(const char* str, void *ip); #endif /* MODULES_INCLUDE_API_H_ */ diff --git a/esp-link.vcxproj b/esp-link.vcxproj index 918ed96..f371f1a 100644 --- a/esp-link.vcxproj +++ b/esp-link.vcxproj @@ -28,7 +28,7 @@ __ets__;_STDINT_H;ICACHE_FLASH;__MINGW32__;__WIN32__ - .\cmd;.\serial;.\user;.\espfs;.\httpd;.\include;..\esp_iot_sdk_v1.3.0\include;C:\tools\mingw64\include + .\esp-link;.\mqtt;.\cmd;.\serial;.\user;.\espfs;.\httpd;.\include;..\esp_iot_sdk_v1.3.0\include;..\xtensa-lx106-elf\xtensa-lx106-elf\include;c:\tools\mingw64\x86_64-w64-mingw32\include;c:\tools\mingw64\lib\gcc\x86_64-w64-mingw32\4.8.3\include @@ -65,6 +65,7 @@ + @@ -75,12 +76,26 @@ + + + + + + + + + + + + + + @@ -93,6 +108,7 @@ + @@ -106,11 +122,24 @@ + + + + + + + + + + + + + diff --git a/user/cgi.c b/esp-link/cgi.c similarity index 100% rename from user/cgi.c rename to esp-link/cgi.c diff --git a/user/cgi.h b/esp-link/cgi.h similarity index 100% rename from user/cgi.h rename to esp-link/cgi.h diff --git a/user/cgiflash.c b/esp-link/cgiflash.c similarity index 100% rename from user/cgiflash.c rename to esp-link/cgiflash.c diff --git a/user/cgiflash.h b/esp-link/cgiflash.h similarity index 100% rename from user/cgiflash.h rename to esp-link/cgiflash.h diff --git a/user/cgipins.c b/esp-link/cgipins.c similarity index 100% rename from user/cgipins.c rename to esp-link/cgipins.c diff --git a/user/cgipins.h b/esp-link/cgipins.h similarity index 100% rename from user/cgipins.h rename to esp-link/cgipins.h diff --git a/user/cgitcp.c b/esp-link/cgitcp.c similarity index 100% rename from user/cgitcp.c rename to esp-link/cgitcp.c diff --git a/user/cgitcp.h b/esp-link/cgitcp.h similarity index 100% rename from user/cgitcp.h rename to esp-link/cgitcp.h diff --git a/user/cgiwifi.c b/esp-link/cgiwifi.c similarity index 100% rename from user/cgiwifi.c rename to esp-link/cgiwifi.c diff --git a/user/cgiwifi.h b/esp-link/cgiwifi.h similarity index 72% rename from user/cgiwifi.h rename to esp-link/cgiwifi.h index d73628e..3f7fe09 100644 --- a/user/cgiwifi.h +++ b/esp-link/cgiwifi.h @@ -4,6 +4,7 @@ #include "httpd.h" enum { wifiIsDisconnected, wifiIsConnected, wifiGotIP }; +typedef void(*WifiStateChangeCb)(uint8_t wifiStatus); int cgiWiFiScan(HttpdConnData *connData); int cgiWifiInfo(HttpdConnData *connData); @@ -13,8 +14,9 @@ int cgiWiFiSetMode(HttpdConnData *connData); int cgiWiFiConnStatus(HttpdConnData *connData); int cgiWiFiSpecial(HttpdConnData *connData); void wifiInit(void); +void wifiAddStateChangeCb(WifiStateChangeCb cb); extern uint8_t wifiState; -extern void (*wifiStatusCb)(uint8_t); // callback when wifi status changes +//extern void (*wifiStatusCb)(uint8_t); // callback when wifi status changes #endif diff --git a/user/config.c b/esp-link/config.c similarity index 100% rename from user/config.c rename to esp-link/config.c diff --git a/user/config.h b/esp-link/config.h similarity index 100% rename from user/config.h rename to esp-link/config.h diff --git a/user/log.c b/esp-link/log.c similarity index 100% rename from user/log.c rename to esp-link/log.c diff --git a/user/log.h b/esp-link/log.h similarity index 100% rename from user/log.h rename to esp-link/log.h diff --git a/esp-link/main.c b/esp-link/main.c new file mode 100644 index 0000000..5febf87 --- /dev/null +++ b/esp-link/main.c @@ -0,0 +1,168 @@ +/* +* ---------------------------------------------------------------------------- +* "THE BEER-WARE LICENSE" (Revision 42): +* Jeroen Domburg wrote this file. As long as you retain +* this notice you can do whatever you want with this stuff. If we meet some day, +* and you think this stuff is worth it, you can buy me a beer in return. +* ---------------------------------------------------------------------------- +* Heavily modified and enhanced by Thorsten von Eicken in 2015 +* ---------------------------------------------------------------------------- +*/ + + +#include +#include "httpd.h" +#include "httpdespfs.h" +#include "cgi.h" +#include "cgiwifi.h" +#include "cgipins.h" +#include "cgitcp.h" +#include "cgiflash.h" +#include "auth.h" +#include "espfs.h" +#include "uart.h" +#include "serbridge.h" +#include "status.h" +#include "serled.h" +#include "console.h" +#include "config.h" +#include "log.h" +#include + +//#define SHOW_HEAP_USE + +//Function that tells the authentication system what users/passwords live on the system. +//This is disabled in the default build; if you want to try it, enable the authBasic line in +//the builtInUrls below. +int myPassFn(HttpdConnData *connData, int no, char *user, int userLen, char *pass, int passLen) { + if (no == 0) { + os_strcpy(user, "admin"); + os_strcpy(pass, "s3cr3t"); + return 1; + //Add more users this way. Check against incrementing no for each user added. + // } else if (no==1) { + // os_strcpy(user, "user1"); + // os_strcpy(pass, "something"); + // return 1; + } + return 0; +} + + +/* +This is the main url->function dispatching data struct. +In short, it's a struct with various URLs plus their handlers. The handlers can +be 'standard' CGI functions you wrote, or 'special' CGIs requiring an argument. +They can also be auth-functions. An asterisk will match any url starting with +everything before the asterisks; "*" matches everything. The list will be +handled top-down, so make sure to put more specific rules above the more +general ones. Authorization things (like authBasic) act as a 'barrier' and +should be placed above the URLs they protect. +*/ +HttpdBuiltInUrl builtInUrls[] = { + { "/", cgiRedirect, "/home.html" }, + { "/menu", cgiMenu, NULL }, + { "/flash/next", cgiGetFirmwareNext, NULL }, + { "/flash/upload", cgiUploadFirmware, NULL }, + { "/flash/reboot", cgiRebootFirmware, NULL }, + //{"/home.html", cgiEspFsHtml, NULL}, + //{"/log.html", cgiEspFsHtml, NULL}, + { "/log/text", ajaxLog, NULL }, + { "/log/dbg", ajaxLogDbg, NULL }, + //{"/console.html", cgiEspFsHtml, NULL}, + { "/console/reset", ajaxConsoleReset, NULL }, + { "/console/baud", ajaxConsoleBaud, NULL }, + { "/console/text", ajaxConsole, NULL }, + + //Routines to make the /wifi URL and everything beneath it work. + + //Enable the line below to protect the WiFi configuration with an username/password combo. + // {"/wifi/*", authBasic, myPassFn}, + + { "/wifi", cgiRedirect, "/wifi/wifi.html" }, + { "/wifi/", cgiRedirect, "/wifi/wifi.html" }, + //{"/wifi/wifi.html", cgiEspFsHtml, NULL}, + { "/wifi/info", cgiWifiInfo, NULL }, + { "/wifi/scan", cgiWiFiScan, NULL }, + { "/wifi/connect", cgiWiFiConnect, NULL }, + { "/wifi/connstatus", cgiWiFiConnStatus, NULL }, + { "/wifi/setmode", cgiWiFiSetMode, NULL }, + { "/wifi/special", cgiWiFiSpecial, NULL }, + { "/pins", cgiPins, NULL }, + { "/tcpclient", cgiTcp, NULL }, + + { "*", cgiEspFsHook, NULL }, //Catch-all cgi function for the filesystem + { NULL, NULL, NULL } +}; + + +//#define SHOW_HEAP_USE + +#ifdef SHOW_HEAP_USE +static ETSTimer prHeapTimer; + +static void ICACHE_FLASH_ATTR prHeapTimerCb(void *arg) { + os_printf("Heap: %ld\n", (unsigned long)system_get_free_heap_size()); +} +#endif + +void user_rf_pre_init(void) { +} + +// address of espfs binary blob +extern uint32_t _binary_espfs_img_start; + +static char *rst_codes[] = { + "normal", "wdt reset", "exception", "soft wdt", "restart", "deep sleep", "external", +}; + +# define VERS_STR_STR(V) #V +# define VERS_STR(V) VERS_STR_STR(V) +char* esp_link_version = VERS_STR(VERSION); + +//Main routine. Initialize stdout, the I/O, filesystem and the webserver and we're done. +void user_init(void) { + // get the flash config so we know how to init things + //configWipe(); // uncomment to reset the config for testing purposes + bool restoreOk = configRestore(); + // init gpio pin registers + gpio_init(); + // init UART + uart_init(flashConfig.baud_rate, 115200); + logInit(); // must come after init of uart + // say hello (leave some time to cause break in TX after boot loader's msg + os_delay_us(10000L); + os_printf("\n\n** %s\n", esp_link_version); + os_printf("Flash config restore %s\n", restoreOk ? "ok" : "*FAILED*"); + // Status LEDs + statusInit(); + serledInit(); + // Wifi + wifiInit(); + // init the flash filesystem with the html stuff + espFsInit(&_binary_espfs_img_start); + //EspFsInitResult res = espFsInit(&_binary_espfs_img_start); + //os_printf("espFsInit %s\n", res?"ERR":"ok"); + // mount the http handlers + httpdInit(builtInUrls, 80); + // init the wifi-serial transparent bridge (port 23) + serbridgeInit(23); + uart_add_recv_cb(&serbridgeUartCb); +#ifdef SHOW_HEAP_USE + os_timer_disarm(&prHeapTimer); + os_timer_setfn(&prHeapTimer, prHeapTimerCb, NULL); + os_timer_arm(&prHeapTimer, 10000, 1); +#endif + + struct rst_info *rst_info = system_get_rst_info(); + os_printf("Reset cause: %d=%s\n", rst_info->reason, rst_codes[rst_info->reason]); + os_printf("exccause=%d epc1=0x%x epc2=0x%x epc3=0x%x excvaddr=0x%x depc=0x%x\n", + rst_info->exccause, rst_info->epc1, rst_info->epc2, rst_info->epc3, + rst_info->excvaddr, rst_info->depc); + os_printf("Flash map %d, chip %08X\n", system_get_flash_size_map(), spi_flash_get_id()); + + os_printf("** esp-link ready\n"); + + // call user_main init + init(); +} \ No newline at end of file diff --git a/user/status.c b/esp-link/status.c similarity index 100% rename from user/status.c rename to esp-link/status.c diff --git a/user/status.h b/esp-link/status.h similarity index 100% rename from user/status.h rename to esp-link/status.h diff --git a/httpd/httpdespfs.c b/httpd/httpdespfs.c index b6853aa..c6f2c0c 100644 --- a/httpd/httpdespfs.c +++ b/httpd/httpdespfs.c @@ -12,12 +12,7 @@ Connector to let httpd use the espfs filesystem to serve the files in it. * Modified and enhanced by Thorsten von Eicken in 2015 * ---------------------------------------------------------------------------- */ - -#include #include "httpdespfs.h" -#include "espfs.h" -#include "espfsformat.h" -#include "cgi.h" // The static files marked with FLAG_GZIP are compressed and will be served with GZIP compression. // If the client does not advertise that he accepts GZIP send following warning message (telnet users for e.g.) @@ -27,7 +22,8 @@ static const char *gzipNonSupportedMessage = "HTTP/1.0 501 Not implemented\r\nSe //This is a catch-all cgi function. It takes the url passed to it, looks up the corresponding //path in the filesystem and if it exists, passes the file through. This simulates what a normal //webserver would do with static files. -int ICACHE_FLASH_ATTR cgiEspFsHook(HttpdConnData *connData) { +int ICACHE_FLASH_ATTR +cgiEspFsHook(HttpdConnData *connData) { EspFsFile *file=connData->cgiData; int len; char buff[1024]; @@ -93,7 +89,8 @@ int ICACHE_FLASH_ATTR cgiEspFsHook(HttpdConnData *connData) { #if 0 //cgiEspFsHtml is a simple HTML file that gets prefixed by head.tpl -int ICACHE_FLASH_ATTR cgiEspFsHtml(HttpdConnData *connData) { +int ICACHE_FLASH_ATTR +cgiEspFsHtml(HttpdConnData *connData) { EspFsFile *file = connData->cgiData; char buff[2048]; diff --git a/httpd/httpdespfs.h b/httpd/httpdespfs.h index fb07008..847a8b6 100644 --- a/httpd/httpdespfs.h +++ b/httpd/httpdespfs.h @@ -1,10 +1,14 @@ #ifndef HTTPDESPFS_H #define HTTPDESPFS_H +#include +#include "espfs.h" +#include "espfsformat.h" +#include "cgi.h" #include "httpd.h" int cgiEspFsHook(HttpdConnData *connData); -int ICACHE_FLASH_ATTR cgiEspFsTemplate(HttpdConnData *connData); +//int cgiEspFsTemplate(HttpdConnData *connData); //int ICACHE_FLASH_ATTR cgiEspFsHtml(HttpdConnData *connData); #endif diff --git a/include/esp8266.h b/include/esp8266.h index 96a7364..c3d0c40 100644 --- a/include/esp8266.h +++ b/include/esp8266.h @@ -1,5 +1,5 @@ // Combined include file for esp8266 - +#include #include #include #include @@ -16,6 +16,9 @@ #include "espmissingincludes.h" #include "uart_hw.h" +extern char* esp_link_version; + +void ICACHE_FLASH_ATTR init(void); #ifdef __WIN32__ #include <_mingw.h> diff --git a/include/user_config.h b/include/user_config.h index 8b13789..7bdfd38 100644 --- a/include/user_config.h +++ b/include/user_config.h @@ -1 +1,20 @@ +#ifndef _USER_CONFIG_H_ +#define _USER_CONFIG_H_ +#define MQTT_RECONNECT_TIMEOUT 5 // seconds +#define MQTT_BUF_SIZE 1024 + +#define MQTT_HOST "10.0.0.220" // "mqtt.yourdomain.com" or ip "10.0.0.1" +#define MQTT_PORT 1883 +#define MQTT_SECURITY 0 + +#define MQTT_CLIENT_ID "esp-link" // "" +#define MQTT_USER "" +#define MQTT_PASS "" +#define MQTT_KEEPALIVE 120 // seconds +#define MQTT_CLSESSION true + +#define PROTOCOL_NAMEv31 // MQTT version 3.1 compatible with Mosquitto v0.15 +//PROTOCOL_NAMEv311 // MQTT version 3.11 compatible with https://eclipse.org/paho/clients/testing/ + +#endif \ No newline at end of file diff --git a/mqtt/mqtt.c b/mqtt/mqtt.c new file mode 100644 index 0000000..12b9514 --- /dev/null +++ b/mqtt/mqtt.c @@ -0,0 +1,635 @@ +/* mqtt.c +* Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "mqtt.h" + +#define MQTT_TASK_PRIO 0 +#define MQTT_TASK_QUEUE_SIZE 1 +#define MQTT_SEND_TIMOUT 5 + +#ifndef QUEUE_BUFFER_SIZE +#define QUEUE_BUFFER_SIZE 2048 +#endif + +unsigned char* default_certificate; +unsigned int default_certificate_len = 0; +unsigned char* default_private_key; +unsigned int default_private_key_len = 0; + +os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]; + +LOCAL void ICACHE_FLASH_ATTR +mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) { + struct espconn* pConn = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pConn->reverse; + + + if (ipaddr == NULL) { + os_printf("DNS: Found, but got no ip, try to reconnect\n"); + client->connState = TCP_RECONNECT_REQ; + return; + } + + os_printf("DNS: found ip %d.%d.%d.%d\n", + *((uint8 *)&ipaddr->addr), + *((uint8 *)&ipaddr->addr + 1), + *((uint8 *)&ipaddr->addr + 2), + *((uint8 *)&ipaddr->addr + 3)); + + if (client->ip.addr == 0 && ipaddr->addr != 0) { + os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); +#ifdef CLIENT_SSL_ENABLE + if (client->security){ + espconn_secure_connect(client->pCon); + } + else +#endif + espconn_connect(client->pCon); + + client->connState = TCP_CONNECTING; + os_printf("MQTT-TCP: connecting...\n"); + } + + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + + +LOCAL void ICACHE_FLASH_ATTR +deliver_publish(MQTT_Client* client, uint8_t* message, uint16_t length) { + mqtt_event_data_t event_data; + + event_data.topic_length = length; + event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length); + event_data.data_length = length; + event_data.data = mqtt_get_publish_data(message, &event_data.data_length); + + if (client->dataCb) + client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length); + + if (client->cmdDataCb) + client->cmdDataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length); + +} + +/** +* @brief Client received callback function. +* @param arg: contain the ip link information +* @param pdata: received data +* @param len: the lenght of received data +* @retval None +*/ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) { + uint8_t msg_type; + uint8_t msg_qos; + uint16_t msg_id; + + struct espconn* pCon = (struct espconn*)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + +READPACKET: + os_printf("MQTT-TCP: Data received %d bytes\n", len); + if (len < MQTT_BUF_SIZE && len > 0) { + os_memcpy(client->mqtt_state.in_buffer, pdata, len); + + msg_type = mqtt_get_type(client->mqtt_state.in_buffer); + msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); + msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + if (client->connState == MQTT_CONNECT_SENDING) { + if (msg_type == MQTT_MSG_TYPE_CONNACK) { + if (client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT) { + os_printf("MQTT: Invalid packet\n"); +#ifdef CLIENT_SSL_ENABLE + if (client->security){ + espconn_secure_disconnect(client->pCon); + } + else +#endif + espconn_disconnect(client->pCon); + } + else { + os_printf("MQTT: Connected to %s:%ld\n", client->host, client->port); + client->connState = MQTT_DATA; + if (client->connectedCb) + client->connectedCb((uint32_t*)client); + if (client->cmdConnectedCb) + client->cmdConnectedCb((uint32_t*)client); + } + } + } + else if (client->connState == MQTT_DATA) { + client->mqtt_state.message_length_read = len; + client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + + + if (msg_type == MQTT_MSG_TYPE_SUBACK) { + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + os_printf("MQTT: Subscribe successful\n"); + } + else if (msg_type == MQTT_MSG_TYPE_UNSUBACK) { + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + os_printf("MQTT: UnSubscribe successful\n"); + } + else if (msg_type == MQTT_MSG_TYPE_PUBLISH) { + if (msg_qos == 1) + client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + else if (msg_qos == 2) + client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + if (msg_qos == 1 || msg_qos == 2) { + os_printf("MQTT: Queue response QoS: %d\n", msg_qos); + if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + os_printf("MQTT: Queue full\n"); + } + } + deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + } + else if (msg_type == MQTT_MSG_TYPE_PUBACK) { + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { + os_printf("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\n"); + } + } + else if (msg_type == MQTT_MSG_TYPE_PUBREC) { + client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + os_printf("MQTT: Queue full\n"); + } + } + else if (msg_type == MQTT_MSG_TYPE_PUBREL) { + client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + os_printf("MQTT: Queue full\n"); + } + } + else if (msg_type == MQTT_MSG_TYPE_PUBCOMP) { + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) { + os_printf("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\n"); + } + } + else if (msg_type == MQTT_MSG_TYPE_PINGREQ) { + client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection); + if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + os_printf("MQTT: Queue full\n"); + } + } + + // NOTE: this is done down here and not in the switch case above + // because the PSOCK_READBUF_LEN() won't work inside a switch + // statement due to the way protothreads resume. + if (msg_type == MQTT_MSG_TYPE_PUBLISH) { + len = client->mqtt_state.message_length_read; + + if (client->mqtt_state.message_length < client->mqtt_state.message_length_read) { + //client->connState = MQTT_PUBLISH_RECV; + //Not Implement yet + len -= client->mqtt_state.message_length; + pdata += client->mqtt_state.message_length; + + os_printf("Get another published message\n"); + goto READPACKET; + } + } + } + } + else { + os_printf("ERROR: Message too long\n"); + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +/** +* @brief Client send over callback function. +* @param arg: contain the ip link information +* @retval None +*/ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_sent_cb(void* arg) { + struct espconn* pCon = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + os_printf("MQTT-TCP: Sent\n"); + client->sendTimeout = 0; + if (client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) { + if (client->publishedCb) + client->publishedCb((uint32_t*)client); + if (client->cmdPublishedCb) + client->cmdPublishedCb((uint32_t*)client); + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +void ICACHE_FLASH_ATTR +mqtt_timer(void* arg) { + MQTT_Client* client = (MQTT_Client*)arg; + + if (client->connState == MQTT_DATA) { + client->keepAliveTick++; + if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive) { + + os_printf("\nMQTT: Send keepalive packet to %s:%ld!\n", client->host, client->port); + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ; + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + os_printf("MQTT: Sending, type: %d, id: %04X\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); +#ifdef CLIENT_SSL_ENABLE + if (client->security){ + espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + else +#endif + espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + client->mqtt_state.outbound_message = NULL; + + client->keepAliveTick = 0; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + } + + } + else if (client->connState == TCP_RECONNECT_REQ) { + client->reconnectTick++; + if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) { + client->reconnectTick = 0; + client->connState = TCP_RECONNECT; + if (client->tcpDisconnectedCb) + client->tcpDisconnectedCb((uint32_t*)client); + if (client->cmdTcpDisconnectedCb) + client->cmdTcpDisconnectedCb((uint32_t*)client); + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + } + } + if (client->sendTimeout > 0) + client->sendTimeout--; +} + +void ICACHE_FLASH_ATTR +mqtt_tcpclient_discon_cb(void* arg) { + + struct espconn* pespconn = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pespconn->reverse; + os_printf("MQTT-TCP: Disconnected callback\n"); + client->connState = TCP_RECONNECT_REQ; + if (client->disconnectedCb) + client->disconnectedCb((uint32_t*)client); + if (client->cmdDisconnectedCb) + client->cmdDisconnectedCb((uint32_t*)client); + + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +/** +* @brief Tcp client connect success callback function. +* @param arg: contain the ip link information +* @retval None +*/ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_connect_cb(void* arg) { + struct espconn* pCon = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + + espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); + espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);//////// + espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);/////// + os_printf("MQTT: Connected to broker %s:%ld\n", client->host, client->port); + + mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length); + client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + os_printf("MQTT: Sending, type: %d, id: %04X\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); +#ifdef CLIENT_SSL_ENABLE + if (client->security){ + espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + else +#endif + espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + client->mqtt_state.outbound_message = NULL; + client->connState = MQTT_CONNECT_SENDING; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +/** +* @brief Tcp client connect repeat callback function. +* @param arg: contain the ip link information +* @retval None +*/ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_recon_cb(void* arg, int8_t errType) { + struct espconn* pCon = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + + os_printf("MQTT-TCP: Reconnect to %s:%ld\n", client->host, client->port); + + client->connState = TCP_RECONNECT_REQ; + + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +/** +* @brief MQTT publish function. +* @param client: MQTT_Client reference +* @param topic: string topic will publish to +* @param data: buffer data send point to +* @param data_length: length of data +* @param qos: qos +* @param retain: retain +* @retval TRUE if success queue +*/ +bool ICACHE_FLASH_ATTR +MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain) { + int data_length = os_strlen(data); + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, + topic, data, data_length, + qos, retain, + &client->mqtt_state.pending_msg_id); + if (client->mqtt_state.outbound_message->length == 0) { + os_printf("MQTT: Queuing Publish failed\n"); + return FALSE; + } + os_printf("MQTT: Queuing Publish, length: %d, queue size(%ld/%ld)\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size); + while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + os_printf("MQTT: Queue full\n"); + if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + os_printf("MQTT: Serious buffer error\n"); + return FALSE; + } + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + return TRUE; +} + +/** +* @brief MQTT subscibe function. +* @param client: MQTT_Client reference +* @param topic: string topic will subscribe +* @param qos: qos +* @retval TRUE if success queue +*/ +bool ICACHE_FLASH_ATTR +MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) { + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + + client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, + topic, 0, + &client->mqtt_state.pending_msg_id); + os_printf("MQTT: Queue Subscribe, topic: \"%s\", id: %d\n", topic, client->mqtt_state.pending_msg_id); + while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + os_printf("MQTT: Queue full\n"); + if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + os_printf("MQTT: Serious buffer error\n"); + return FALSE; + } + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + return TRUE; +} + +void ICACHE_FLASH_ATTR +MQTT_Task(os_event_t* e) { + MQTT_Client* client = (MQTT_Client*)e->par; + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + if (e->par == 0) + return; + + if (client->connState == TCP_RECONNECT_REQ) { + return; + } + else if (client->connState == TCP_RECONNECT) { + MQTT_Connect(client); + os_printf("MQTT-TCP: Reconnect to: %s:%ld\n", client->host, client->port); + client->connState = TCP_CONNECTING; + } + else if (client->connState == MQTT_DATA) { + if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) + return; + + if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) { + client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer); + client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen); + client->sendTimeout = MQTT_SEND_TIMOUT; + os_printf("MQTT: Sending, type: %d, id: %04X\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); +#ifdef CLIENT_SSL_ENABLE + if (client->security){ + espconn_secure_sent(client->pCon, dataBuffer, dataLen); + } + else +#endif + espconn_sent(client->pCon, dataBuffer, dataLen); + + client->mqtt_state.outbound_message = NULL; + return; + } + return; + } +} + +/** +* @brief MQTT initialization connection function +* @param client: MQTT_Client reference +* @param host: Domain or IP string +* @param port: Port to connect +* @param security: 1 for ssl, 0 for none +* @retval None +*/ +void ICACHE_FLASH_ATTR +MQTT_InitConnection(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security) { + os_printf("MQTT_InitConnection\n"); + uint8_t len = sizeof(MQTT_Client); + os_memset(mqttClient, 0, len); + + uint32_t temp = os_strlen(host); + mqttClient->host = (char*)os_zalloc(temp + 1); + os_strcpy(mqttClient->host, host); + mqttClient->host[temp] = 0; + + mqttClient->port = port; + mqttClient->security = security; +} + +/** +* @brief MQTT initialization mqtt client function +* @param client: MQTT_Client reference +* @param clientid: MQTT client id +* @param client_user:MQTT client user +* @param client_pass:MQTT client password +* @param client_pass:MQTT keep alive timer, in second +* @retval None +*/ +void ICACHE_FLASH_ATTR +MQTT_InitClient(MQTT_Client* mqttClient, char* client_id, char* client_user, char* client_pass, uint8_t keepAliveTime, uint8_t cleanSession) { + uint32_t temp; + os_printf("MQTT_InitClient\n"); + + os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t)); + + temp = os_strlen(client_id); + mqttClient->connect_info.client_id = (char*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.client_id, client_id); + mqttClient->connect_info.client_id[temp] = 0; + + temp = os_strlen(client_user); + mqttClient->connect_info.username = (char*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.username, client_user); + mqttClient->connect_info.username[temp] = 0; + + temp = os_strlen(client_pass); + mqttClient->connect_info.password = (char*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.password, client_pass); + mqttClient->connect_info.password[temp] = 0; + + + mqttClient->connect_info.keepalive = keepAliveTime; + mqttClient->connect_info.clean_session = cleanSession; + + mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE); + mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; + mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE); + mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE; + mqttClient->mqtt_state.connect_info = &mqttClient->connect_info; + + mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length); + + QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE); + + system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE); + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient); +} + +void ICACHE_FLASH_ATTR +MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, uint8_t will_qos, uint8_t will_retain) { + uint32_t temp; + temp = os_strlen((char*)will_topic); + mqttClient->connect_info.will_topic = (char*)os_zalloc(temp + 1); + os_strcpy((char*)mqttClient->connect_info.will_topic, (char*)will_topic); + mqttClient->connect_info.will_topic[temp] = 0; + + temp = os_strlen((char*)will_msg); + mqttClient->connect_info.will_message = (char*)os_zalloc(temp + 1); + os_strcpy((char*)mqttClient->connect_info.will_message, (char*)will_msg); + mqttClient->connect_info.will_message[temp] = 0; + + + mqttClient->connect_info.will_qos = will_qos; + mqttClient->connect_info.will_retain = will_retain; +} + +/** +* @brief Begin connect to MQTT broker +* @param client: MQTT_Client reference +* @retval None +*/ +void ICACHE_FLASH_ATTR +MQTT_Connect(MQTT_Client* mqttClient) { + MQTT_Disconnect(mqttClient); + mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); + mqttClient->pCon->type = ESPCONN_TCP; + mqttClient->pCon->state = ESPCONN_NONE; + mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp)); + mqttClient->pCon->proto.tcp->local_port = espconn_port(); + mqttClient->pCon->proto.tcp->remote_port = mqttClient->port; + mqttClient->pCon->reverse = mqttClient; + espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb); + espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb); + + mqttClient->keepAliveTick = 0; + mqttClient->reconnectTick = 0; + + + os_timer_disarm(&mqttClient->mqttTimer); + os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient); + os_timer_arm(&mqttClient->mqttTimer, 1000, 1); + + if (UTILS_StrToIP((const char *)mqttClient->host, (void*)&mqttClient->pCon->proto.tcp->remote_ip)) { + os_printf("MQTT-TCP: Connect to ip %s:%ld\n", mqttClient->host, mqttClient->port); +#ifdef CLIENT_SSL_ENABLE + if (mqttClient->security){ + espconn_secure_connect(mqttClient->pCon); + } + else +#endif + espconn_connect(mqttClient->pCon); + } + else { + os_printf("MQTT-TCP: Connect to domain %s:%ld\n", mqttClient->host, mqttClient->port); + espconn_gethostbyname(mqttClient->pCon, (const char *)mqttClient->host, &mqttClient->ip, mqtt_dns_found); + } + mqttClient->connState = TCP_CONNECTING; +} + +void ICACHE_FLASH_ATTR +MQTT_Disconnect(MQTT_Client* mqttClient) { + if (mqttClient->pCon) { + os_printf("Free memory\n"); + if (mqttClient->pCon->proto.tcp) + os_free(mqttClient->pCon->proto.tcp); + os_free(mqttClient->pCon); + mqttClient->pCon = NULL; + } + + os_timer_disarm(&mqttClient->mqttTimer); +} + +void ICACHE_FLASH_ATTR +MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb) { + mqttClient->connectedCb = connectedCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb) { + mqttClient->disconnectedCb = disconnectedCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnTcpDisconnected(MQTT_Client *mqttClient, MqttCallback tcpDisconnectedCb) +{ + mqttClient->tcpDisconnectedCb = tcpDisconnectedCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb) { + mqttClient->dataCb = dataCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb) { + mqttClient->publishedCb = publishedCb; +} diff --git a/mqtt/mqtt.h b/mqtt/mqtt.h new file mode 100644 index 0000000..9ad6825 --- /dev/null +++ b/mqtt/mqtt.h @@ -0,0 +1,144 @@ +/* mqtt.h +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ +#ifndef USER_AT_MQTT_H_ +#define USER_AT_MQTT_H_ + +#include +#include "mqtt_msg.h" +#include "queue.h" +#include + +typedef struct mqtt_event_data_t { + uint8_t type; + const char* topic; + const char* data; + uint16_t topic_length; + uint16_t data_length; + uint16_t data_offset; +} mqtt_event_data_t; + +typedef struct mqtt_state_t { + uint16_t port; + int auto_reconnect; + mqtt_connect_info_t* connect_info; + uint8_t* in_buffer; + uint8_t* out_buffer; + int in_buffer_length; + int out_buffer_length; + uint16_t message_length; + uint16_t message_length_read; + mqtt_message_t* outbound_message; + mqtt_connection_t mqtt_connection; + uint16_t pending_msg_id; + int pending_msg_type; + int pending_publish_qos; +} mqtt_state_t; + +typedef enum { + WIFI_INIT, + WIFI_CONNECTING, + WIFI_CONNECTING_ERROR, + WIFI_CONNECTED, + DNS_RESOLVE, + TCP_DISCONNECTED, + TCP_RECONNECT_REQ, + TCP_RECONNECT, + TCP_CONNECTING, + TCP_CONNECTING_ERROR, + TCP_CONNECTED, + MQTT_CONNECT_SEND, + MQTT_CONNECT_SENDING, + MQTT_SUBSCIBE_SEND, + MQTT_SUBSCIBE_SENDING, + MQTT_DATA, + MQTT_PUBLISH_RECV, + MQTT_PUBLISHING +} tConnState; + +typedef void (*MqttCallback)(uint32_t* args); +typedef void (*MqttDataCallback)(uint32_t* args, const char* topic, uint32_t topic_len, const char* data, uint32_t lengh); + +typedef struct { + struct espconn* pCon; + uint8_t security; + char* host; + uint32_t port; + ip_addr_t ip; + mqtt_state_t mqtt_state; + mqtt_connect_info_t connect_info; + MqttCallback connectedCb; + MqttCallback cmdConnectedCb; + MqttCallback disconnectedCb; + MqttCallback cmdDisconnectedCb; + MqttCallback tcpDisconnectedCb; + MqttCallback cmdTcpDisconnectedCb; + MqttCallback publishedCb; + MqttCallback cmdPublishedCb; + MqttDataCallback dataCb; + MqttDataCallback cmdDataCb; + ETSTimer mqttTimer; + uint32_t keepAliveTick; + uint32_t reconnectTick; + uint32_t sendTimeout; + tConnState connState; + QUEUE msgQueue; + void* user_data; +} MQTT_Client; + +#define SEC_NONSSL 0 +#define SEC_SSL 1 + +#define MQTT_FLAG_CONNECTED 1 +#define MQTT_FLAG_READY 2 +#define MQTT_FLAG_EXIT 4 + +#define MQTT_EVENT_TYPE_NONE 0 +#define MQTT_EVENT_TYPE_CONNECTED 1 +#define MQTT_EVENT_TYPE_DISCONNECTED 2 +#define MQTT_EVENT_TYPE_SUBSCRIBED 3 +#define MQTT_EVENT_TYPE_UNSUBSCRIBED 4 +#define MQTT_EVENT_TYPE_PUBLISH 5 +#define MQTT_EVENT_TYPE_PUBLISHED 6 +#define MQTT_EVENT_TYPE_EXITED 7 +#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION 8 + +void MQTT_InitConnection(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security); +void MQTT_InitClient(MQTT_Client* mqttClient, char* client_id, char* client_user, char* client_pass, uint8_t keepAliveTime, uint8_t cleanSession); +void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, uint8_t will_qos, uint8_t will_retain); +void MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb); +void MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb); +void MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb); +void MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb); +bool MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos); +void MQTT_Connect(MQTT_Client* mqttClient); +void MQTT_Disconnect(MQTT_Client* mqttClient); +bool MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain); + +#endif /* USER_AT_MQTT_H_ */ diff --git a/mqtt/mqtt_msg.c b/mqtt/mqtt_msg.c new file mode 100644 index 0000000..03da28e --- /dev/null +++ b/mqtt/mqtt_msg.c @@ -0,0 +1,451 @@ +/* +* Copyright (c) 2014, Stephen Robinson +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#include "mqtt_msg.h" +#define MQTT_MAX_FIXED_HEADER_SIZE 3 + +enum mqtt_connect_flag { + MQTT_CONNECT_FLAG_USERNAME = 1 << 7, + MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, + MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, + MQTT_CONNECT_FLAG_WILL = 1 << 2, + MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 +}; + +struct + __attribute__((__packed__)) mqtt_connect_variable_header { + uint8_t lengthMsb; + uint8_t lengthLsb; +#if defined(PROTOCOL_NAMEv31) + uint8_t magic[6]; +#elif defined(PROTOCOL_NAMEv311) + uint8_t magic[4]; +#else +#error "Please define protocol name" +#endif + uint8_t version; + uint8_t flags; + uint8_t keepaliveMsb; + uint8_t keepaliveLsb; +}; + +static int ICACHE_FLASH_ATTR +append_string(mqtt_connection_t* connection, const char* string, int len) { + if (connection->message.length + len + 2 > connection->buffer_length) + return -1; + + connection->buffer[connection->message.length++] = len >> 8; + connection->buffer[connection->message.length++] = len & 0xff; + memcpy(connection->buffer + connection->message.length, string, len); + connection->message.length += len; + + return len + 2; +} + +static uint16_t ICACHE_FLASH_ATTR +append_message_id(mqtt_connection_t* connection, uint16_t message_id) { + // If message_id is zero then we should assign one, otherwise + // we'll use the one supplied by the caller + while (message_id == 0) + message_id = ++connection->message_id; + + if (connection->message.length + 2 > connection->buffer_length) + return 0; + + connection->buffer[connection->message.length++] = message_id >> 8; + connection->buffer[connection->message.length++] = message_id & 0xff; + + return message_id; +} + +static int ICACHE_FLASH_ATTR +init_message(mqtt_connection_t* connection) { + connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; + return MQTT_MAX_FIXED_HEADER_SIZE; +} + +static mqtt_message_t* ICACHE_FLASH_ATTR +fail_message(mqtt_connection_t* connection) { + connection->message.data = connection->buffer; + connection->message.length = 0; + return &connection->message; +} + +static mqtt_message_t* ICACHE_FLASH_ATTR +fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) { + int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + + if (remaining_length > 127) { + connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + connection->buffer[1] = 0x80 | (remaining_length % 128); + connection->buffer[2] = remaining_length / 128; + connection->message.length = remaining_length + 3; + connection->message.data = connection->buffer; + } + else { + connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + connection->buffer[2] = remaining_length; + connection->message.length = remaining_length + 2; + connection->message.data = connection->buffer + 1; + } + + return &connection->message; +} + +void ICACHE_FLASH_ATTR +mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length) { + uint8_t len = sizeof(connection); + memset(connection, '\0', len); + connection->buffer = buffer; + connection->buffer_length = buffer_length; +} + +int ICACHE_FLASH_ATTR +mqtt_get_total_length(uint8_t* buffer, uint16_t length) { + int i; + int totlen = 0; + + for (i = 1; i < length; ++i) { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if ((buffer[i] & 0x80) == 0) { + ++i; + break; + } + } + totlen += i; + + return totlen; +} + +const char* ICACHE_FLASH_ATTR +mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) { + int i; + int totlen = 0; + int topiclen; + + for (i = 1; i < *length; ++i) { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if ((buffer[i] & 0x80) == 0) { + ++i; + break; + } + } + totlen += i; + + if (i + 2 >= *length) + return NULL; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if (i + topiclen > *length) + return NULL; + + *length = topiclen; + return (const char*)(buffer + i); +} + +const char* ICACHE_FLASH_ATTR +mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) { + int i; + int totlen = 0; + int topiclen; + + for (i = 1; i < *length; ++i) { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if ((buffer[i] & 0x80) == 0) { + ++i; + break; + } + } + totlen += i; + + if (i + 2 >= *length) + return NULL; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if (i + topiclen >= *length) { + *length = 0; + return NULL; + } + i += topiclen; + + if (mqtt_get_qos(buffer) > 0) { + if (i + 2 >= *length) + return NULL; + i += 2; + } + + if (totlen < i) + return NULL; + + if (totlen <= *length) + *length = totlen - i; + else + *length = *length - i; + return (const char*)(buffer + i); +} + +uint16_t ICACHE_FLASH_ATTR +mqtt_get_id(uint8_t* buffer, uint16_t length) { + if (length < 1) + return 0; + + switch (mqtt_get_type(buffer)) { + case MQTT_MSG_TYPE_PUBLISH: { + int i; + int topiclen; + + for (i = 1; i < length; ++i) { + if ((buffer[i] & 0x80) == 0) { + ++i; + break; + } + } + + if (i + 2 >= length) + return 0; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if (i + topiclen >= length) + return 0; + i += topiclen; + + if (mqtt_get_qos(buffer) > 0) { + if (i + 2 >= length) + return 0; + //i += 2; + } + else { + return 0; + } + + return (buffer[i] << 8) | buffer[i + 1]; + } + case MQTT_MSG_TYPE_PUBACK: + case MQTT_MSG_TYPE_PUBREC: + case MQTT_MSG_TYPE_PUBREL: + case MQTT_MSG_TYPE_PUBCOMP: + case MQTT_MSG_TYPE_SUBACK: + case MQTT_MSG_TYPE_UNSUBACK: + case MQTT_MSG_TYPE_SUBSCRIBE: { + // This requires the remaining length to be encoded in 1 byte, + // which it should be. + if (length >= 4 && (buffer[1] & 0x80) == 0) + return (buffer[2] << 8) | buffer[3]; + else + return 0; + } + + default: + return 0; + } +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info) { + struct mqtt_connect_variable_header* variable_header; + + init_message(connection); + + if (connection->message.length + sizeof(*variable_header) > connection->buffer_length) + return fail_message(connection); + variable_header = (void*)(connection->buffer + connection->message.length); + connection->message.length += sizeof(*variable_header); + + variable_header->lengthMsb = 0; +#if defined(PROTOCOL_NAMEv31) + variable_header->lengthLsb = 6; + memcpy(variable_header->magic, "MQIsdp", 6); + variable_header->version = 3; +#elif defined(PROTOCOL_NAMEv311) + variable_header->lengthLsb = 4; + memcpy(variable_header->magic, "MQTT", 4); + variable_header->version = 4; +#else +#error "Please define protocol name" +#endif + + variable_header->flags = 0; + variable_header->keepaliveMsb = info->keepalive >> 8; + variable_header->keepaliveLsb = info->keepalive & 0xff; + + if (info->clean_session) + variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; + + if (info->client_id != NULL && info->client_id[0] != '\0') { + if (append_string(connection, info->client_id, strlen(info->client_id)) < 0) + return fail_message(connection); + } + else + return fail_message(connection); + + if (info->will_topic != NULL && info->will_topic[0] != '\0') { + if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) + return fail_message(connection); + + if (append_string(connection, info->will_message, strlen(info->will_message)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_WILL; + if (info->will_retain) + variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; + variable_header->flags |= (info->will_qos & 3) << 3; + } + + if (info->username != NULL && info->username[0] != '\0') { + if (append_string(connection, info->username, strlen(info->username)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; + } + + if (info->password != NULL && info->password[0] != '\0') { + if (append_string(connection, info->password, strlen(info->password)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; + } + + return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id) { + init_message(connection); + + if (topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if (append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + if (qos > 0) { + if ((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + } + else + *message_id = 0; + + if (connection->message.length + data_length > connection->buffer_length) + return fail_message(connection); + memcpy(connection->buffer + connection->message.length, data, data_length); + connection->message.length += data_length; + + return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id) { + init_message(connection); + if (append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id) { + init_message(connection); + if (append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id) { + init_message(connection); + if (append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id) { + init_message(connection); + if (append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id) { + init_message(connection); + + if (topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if ((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + + if (append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + if (connection->message.length + 1 > connection->buffer_length) + return fail_message(connection); + connection->buffer[connection->message.length++] = qos; + + return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id) { + init_message(connection); + + if (topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if ((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + + if (append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_pingreq(mqtt_connection_t* connection) { + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_pingresp(mqtt_connection_t* connection) { + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR +mqtt_msg_disconnect(mqtt_connection_t* connection) { + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); +} diff --git a/mqtt/mqtt_msg.h b/mqtt/mqtt_msg.h new file mode 100644 index 0000000..bf305df --- /dev/null +++ b/mqtt/mqtt_msg.h @@ -0,0 +1,117 @@ +/* +* Copyright (c) 2014, Stephen Robinson +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#ifndef MQTT_MSG_H +#define MQTT_MSG_H +#include + +enum mqtt_message_type { + MQTT_MSG_TYPE_CONNECT = 1, + MQTT_MSG_TYPE_CONNACK = 2, + MQTT_MSG_TYPE_PUBLISH = 3, + MQTT_MSG_TYPE_PUBACK = 4, + MQTT_MSG_TYPE_PUBREC = 5, + MQTT_MSG_TYPE_PUBREL = 6, + MQTT_MSG_TYPE_PUBCOMP = 7, + MQTT_MSG_TYPE_SUBSCRIBE = 8, + MQTT_MSG_TYPE_SUBACK = 9, + MQTT_MSG_TYPE_UNSUBSCRIBE = 10, + MQTT_MSG_TYPE_UNSUBACK = 11, + MQTT_MSG_TYPE_PINGREQ = 12, + MQTT_MSG_TYPE_PINGRESP = 13, + MQTT_MSG_TYPE_DISCONNECT = 14 +}; + +typedef struct mqtt_message { + uint8_t* data; + uint16_t length; + +} mqtt_message_t; + +typedef struct mqtt_connection { + mqtt_message_t message; + + uint16_t message_id; + uint8_t* buffer; + uint16_t buffer_length; + +} mqtt_connection_t; + +typedef struct mqtt_connect_info { + char* client_id; + char* username; + char* password; + char* will_topic; + char* will_message; + uint32_t keepalive; + uint8_t will_qos; + uint8_t will_retain; + uint8_t clean_session; + +} mqtt_connect_info_t; + + +static inline int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t* buffer) { + return (buffer[0] & 0xf0) >> 4; +} + +static inline int ICACHE_FLASH_ATTR mqtt_get_dup(uint8_t* buffer) { + return (buffer[0] & 0x08) >> 3; +} + +static inline int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t* buffer) { + return (buffer[0] & 0x06) >> 1; +} + +static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { + return (buffer[0] & 0x01); +} + +void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); +int mqtt_get_total_length(uint8_t* buffer, uint16_t length); +const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); +const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); +uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); + +mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); +mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); +mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); +mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); +mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection); + +#endif // MQTT_MSG_H + diff --git a/mqtt/proto.c b/mqtt/proto.c new file mode 100644 index 0000000..69b5367 --- /dev/null +++ b/mqtt/proto.c @@ -0,0 +1,86 @@ +#include "proto.h" + +int8_t ICACHE_FLASH_ATTR +PROTO_Init(PROTO_PARSER* parser, PROTO_PARSE_CALLBACK* completeCallback, uint8_t* buf, uint16_t bufSize) { + parser->buf = buf; + parser->bufSize = bufSize; + parser->dataLen = 0; + parser->callback = completeCallback; + parser->isEsc = 0; + return 0; +} + +int8_t ICACHE_FLASH_ATTR +PROTO_ParseByte(PROTO_PARSER* parser, uint8_t value) { + switch (value) { + case 0x7D: + parser->isEsc = 1; + break; + + case 0x7E: + parser->dataLen = 0; + parser->isEsc = 0; + parser->isBegin = 1; + break; + + case 0x7F: + if (parser->callback != NULL) + parser->callback(); + parser->isBegin = 0; + return 0; + break; + + default: + if (parser->isBegin == 0) break; + + if (parser->isEsc) { + value ^= 0x20; + parser->isEsc = 0; + } + + if (parser->dataLen < parser->bufSize) + parser->buf[parser->dataLen++] = value; + + break; + } + return -1; +} + +int16_t ICACHE_FLASH_ATTR +PROTO_ParseRb(RINGBUF* rb, uint8_t* bufOut, uint16_t* len, uint16_t maxBufLen) { + uint8_t c; + + PROTO_PARSER proto; + PROTO_Init(&proto, NULL, bufOut, maxBufLen); + while (RINGBUF_Get(rb, &c) == 0) { + if (PROTO_ParseByte(&proto, c) == 0) { + *len = proto.dataLen; + return 0; + } + } + return -1; +} + +int16_t ICACHE_FLASH_ATTR +PROTO_AddRb(RINGBUF* rb, const uint8_t* packet, int16_t len) { + uint16_t i = 2; + if (RINGBUF_Put(rb, 0x7E) == -1) return -1; + while (len--) { + switch (*packet) { + case 0x7D: + case 0x7E: + case 0x7F: + if (RINGBUF_Put(rb, 0x7D) == -1) return -1; + if (RINGBUF_Put(rb, *packet++ ^ 0x20) == -1) return -1; + i += 2; + break; + default: + if (RINGBUF_Put(rb, *packet++) == -1) return -1; + i++; + break; + } + } + if (RINGBUF_Put(rb, 0x7F) == -1) return -1; + + return i; +} diff --git a/mqtt/proto.h b/mqtt/proto.h new file mode 100644 index 0000000..67b3e26 --- /dev/null +++ b/mqtt/proto.h @@ -0,0 +1,21 @@ +#ifndef _PROTO_H_ +#define _PROTO_H_ +#include +#include "ringbuf.h" + +typedef void (PROTO_PARSE_CALLBACK)(); + +typedef struct { + uint8_t* buf; + uint16_t bufSize; + uint16_t dataLen; + uint8_t isEsc; + uint8_t isBegin; + PROTO_PARSE_CALLBACK* callback; +} PROTO_PARSER; + +int8_t PROTO_Init(PROTO_PARSER* parser, PROTO_PARSE_CALLBACK* completeCallback, uint8_t* buf, uint16_t bufSize); +int16_t PROTO_AddRb(RINGBUF* rb, const uint8_t* packet, int16_t len); +int8_t PROTO_ParseByte(PROTO_PARSER* parser, uint8_t value); +int16_t PROTO_ParseRb(RINGBUF* rb, uint8_t* bufOut, uint16_t* len, uint16_t maxBufLen); +#endif diff --git a/mqtt/queue.c b/mqtt/queue.c new file mode 100644 index 0000000..39c4790 --- /dev/null +++ b/mqtt/queue.c @@ -0,0 +1,53 @@ +/* str_queue.c +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ +#include "queue.h" + +void ICACHE_FLASH_ATTR +QUEUE_Init(QUEUE* queue, int bufferSize) { + queue->buf = (uint8_t*)os_zalloc(bufferSize); + RINGBUF_Init(&queue->rb, queue->buf, bufferSize); +} + +int32_t ICACHE_FLASH_ATTR +QUEUE_Puts(QUEUE* queue, uint8_t* buffer, uint16_t len) { + return PROTO_AddRb(&queue->rb, buffer, len); +} + +int32_t ICACHE_FLASH_ATTR +QUEUE_Gets(QUEUE* queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen) { + return PROTO_ParseRb(&queue->rb, buffer, len, maxLen); +} + +bool ICACHE_FLASH_ATTR +QUEUE_IsEmpty(QUEUE* queue) { + if (queue->rb.fill_cnt <= 0) + return TRUE; + return FALSE; +} diff --git a/mqtt/queue.h b/mqtt/queue.h new file mode 100644 index 0000000..bdbc503 --- /dev/null +++ b/mqtt/queue.h @@ -0,0 +1,46 @@ +/* str_queue.h -- +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef USER_QUEUE_H_ +#define USER_QUEUE_H_ +#include +#include "proto.h" +#include "ringbuf.h" + +typedef struct { + uint8_t* buf; + RINGBUF rb; +} QUEUE; + +void QUEUE_Init(QUEUE* queue, int bufferSize); +int32_t QUEUE_Puts(QUEUE* queue, uint8_t* buffer, uint16_t len); +int32_t QUEUE_Gets(QUEUE* queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen); +bool QUEUE_IsEmpty(QUEUE* queue); +#endif /* USER_QUEUE_H_ */ diff --git a/mqtt/ringbuf.c b/mqtt/ringbuf.c new file mode 100644 index 0000000..a0cc782 --- /dev/null +++ b/mqtt/ringbuf.c @@ -0,0 +1,63 @@ +#include "ringbuf.h" + +/** +* \brief init a RINGBUF object +* \param r pointer to a RINGBUF object +* \param buf pointer to a byte array +* \param size size of buf +* \return 0 if successfull, otherwise failed +*/ +int16_t ICACHE_FLASH_ATTR +RINGBUF_Init(RINGBUF* r, uint8_t* buf, int32_t size) { + if (r == NULL || buf == NULL || size < 2) return -1; + + r->p_o = r->p_r = r->p_w = buf; + r->fill_cnt = 0; + r->size = size; + + return 0; +} + +/** +* \brief put a character into ring buffer +* \param r pointer to a ringbuf object +* \param c character to be put +* \return 0 if successfull, otherwise failed +*/ +int16_t ICACHE_FLASH_ATTR +RINGBUF_Put(RINGBUF* r, uint8_t c) { + if (r->fill_cnt >= r->size)return -1; // ring buffer is full, this should be atomic operation + + + r->fill_cnt++; // increase filled slots count, this should be atomic operation + + + *r->p_w++ = c; // put character into buffer + + if (r->p_w >= r->p_o + r->size) // rollback if write pointer go pass + r->p_w = r->p_o; // the physical boundary + + return 0; +} + +/** +* \brief get a character from ring buffer +* \param r pointer to a ringbuf object +* \param c read character +* \return 0 if successfull, otherwise failed +*/ +int16_t ICACHE_FLASH_ATTR +RINGBUF_Get(RINGBUF* r, uint8_t* c) { + if (r->fill_cnt <= 0)return -1; // ring buffer is empty, this should be atomic operation + + + r->fill_cnt--; // decrease filled slots count + + + *c = *r->p_r++; // get the character out + + if (r->p_r >= r->p_o + r->size) // rollback if write pointer go pass + r->p_r = r->p_o; // the physical boundary + + return 0; +} diff --git a/mqtt/ringbuf.h b/mqtt/ringbuf.h new file mode 100644 index 0000000..d504d53 --- /dev/null +++ b/mqtt/ringbuf.h @@ -0,0 +1,17 @@ +#ifndef _RING_BUF_H_ +#define _RING_BUF_H_ + +#include + +typedef struct { + uint8_t* p_o; /**< Original pointer */ + uint8_t* volatile p_r; /**< Read pointer */ + uint8_t* volatile p_w; /**< Write pointer */ + volatile int32_t fill_cnt; /**< Number of filled slots */ + int32_t size; /**< Buffer size */ +} RINGBUF; + +int16_t RINGBUF_Init(RINGBUF* r, uint8_t* buf, int32_t size); +int16_t RINGBUF_Put(RINGBUF* r, uint8_t c); +int16_t RINGBUF_Get(RINGBUF* r, uint8_t* c); +#endif diff --git a/user/user_main.c b/user/user_main.c index 54ca59e..5738807 100644 --- a/user/user_main.c +++ b/user/user_main.c @@ -1,165 +1,70 @@ -/* - * ---------------------------------------------------------------------------- - * "THE BEER-WARE LICENSE" (Revision 42): - * Jeroen Domburg wrote this file. As long as you retain - * this notice you can do whatever you want with this stuff. If we meet some day, - * and you think this stuff is worth it, you can buy me a beer in return. - * ---------------------------------------------------------------------------- - * Heavily modified and enhanced by Thorsten von Eicken in 2015 - * ---------------------------------------------------------------------------- - */ - - #include -#include "httpd.h" -#include "httpdespfs.h" -#include "cgi.h" -#include "cgiwifi.h" -#include "cgipins.h" -#include "cgitcp.h" -#include "cgiflash.h" -#include "auth.h" -#include "espfs.h" -#include "uart.h" -#include "serbridge.h" -#include "status.h" -#include "serled.h" -#include "console.h" -#include "config.h" -#include "log.h" -#include +#include +#include -//#define SHOW_HEAP_USE +MQTT_Client mqttClient; -//Function that tells the authentication system what users/passwords live on the system. -//This is disabled in the default build; if you want to try it, enable the authBasic line in -//the builtInUrls below. -int myPassFn(HttpdConnData *connData, int no, char *user, int userLen, char *pass, int passLen) { - if (no==0) { - os_strcpy(user, "admin"); - os_strcpy(pass, "s3cr3t"); - return 1; -//Add more users this way. Check against incrementing no for each user added. -// } else if (no==1) { -// os_strcpy(user, "user1"); -// os_strcpy(pass, "something"); -// return 1; - } - return 0; +void ICACHE_FLASH_ATTR +mqttConnectedCb(uint32_t *args) { + MQTT_Client* client = (MQTT_Client*)args; + MQTT_Publish(client, "announce/all", "Hello World!", 0, 0); } +void ICACHE_FLASH_ATTR +mqttDisconnectedCb(uint32_t *args) { +// MQTT_Client* client = (MQTT_Client*)args; + os_printf("MQTT Disconnected\n"); +} -/* -This is the main url->function dispatching data struct. -In short, it's a struct with various URLs plus their handlers. The handlers can -be 'standard' CGI functions you wrote, or 'special' CGIs requiring an argument. -They can also be auth-functions. An asterisk will match any url starting with -everything before the asterisks; "*" matches everything. The list will be -handled top-down, so make sure to put more specific rules above the more -general ones. Authorization things (like authBasic) act as a 'barrier' and -should be placed above the URLs they protect. -*/ -HttpdBuiltInUrl builtInUrls[]={ - {"/", cgiRedirect, "/home.html"}, - {"/menu", cgiMenu, NULL}, - {"/flash/next", cgiGetFirmwareNext, NULL}, - {"/flash/upload", cgiUploadFirmware, NULL}, - {"/flash/reboot", cgiRebootFirmware, NULL}, - //{"/home.html", cgiEspFsHtml, NULL}, - //{"/log.html", cgiEspFsHtml, NULL}, - {"/log/text", ajaxLog, NULL}, - {"/log/dbg", ajaxLogDbg, NULL}, - //{"/console.html", cgiEspFsHtml, NULL}, - {"/console/reset", ajaxConsoleReset, NULL}, - {"/console/baud", ajaxConsoleBaud, NULL}, - {"/console/text", ajaxConsole, NULL}, - - //Routines to make the /wifi URL and everything beneath it work. - -//Enable the line below to protect the WiFi configuration with an username/password combo. -// {"/wifi/*", authBasic, myPassFn}, - - {"/wifi", cgiRedirect, "/wifi/wifi.html"}, - {"/wifi/", cgiRedirect, "/wifi/wifi.html"}, - //{"/wifi/wifi.html", cgiEspFsHtml, NULL}, - {"/wifi/info", cgiWifiInfo, NULL}, - {"/wifi/scan", cgiWiFiScan, NULL}, - {"/wifi/connect", cgiWiFiConnect, NULL}, - {"/wifi/connstatus", cgiWiFiConnStatus, NULL}, - {"/wifi/setmode", cgiWiFiSetMode, NULL}, - {"/wifi/special", cgiWiFiSpecial, NULL}, - {"/pins", cgiPins, NULL}, - {"/tcpclient", cgiTcp, NULL}, - - {"*", cgiEspFsHook, NULL}, //Catch-all cgi function for the filesystem - {NULL, NULL, NULL} -}; - - -//#define SHOW_HEAP_USE - -#ifdef SHOW_HEAP_USE -static ETSTimer prHeapTimer; - -static void ICACHE_FLASH_ATTR prHeapTimerCb(void *arg) { - os_printf("Heap: %ld\n", (unsigned long)system_get_free_heap_size()); +void ICACHE_FLASH_ATTR +mqttTcpDisconnectedCb(uint32_t *args) { +// MQTT_Client* client = (MQTT_Client*)args; + os_printf("MQTT TCP Disconnected\n"); } -#endif -void user_rf_pre_init(void) { +void ICACHE_FLASH_ATTR +mqttPublishedCb(uint32_t *args) { +// MQTT_Client* client = (MQTT_Client*)args; + os_printf("MQTT Published\n"); } -// address of espfs binary blob -extern uint32_t _binary_espfs_img_start; +void ICACHE_FLASH_ATTR +mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len) { + char *topicBuf = (char*)os_zalloc(topic_len + 1); + char *dataBuf = (char*)os_zalloc(data_len + 1); -static char *rst_codes[] = { - "normal", "wdt reset", "exception", "soft wdt", "restart", "deep sleep", "external", -}; +// MQTT_Client* client = (MQTT_Client*)args; -# define VERS_STR_STR(V) #V -# define VERS_STR(V) VERS_STR_STR(V) -char *esp_link_version = VERS_STR(VERSION); + os_memcpy(topicBuf, topic, topic_len); + topicBuf[topic_len] = 0; -//Main routine. Initialize stdout, the I/O, filesystem and the webserver and we're done. -void user_init(void) { - // get the flash config so we know how to init things - //configWipe(); // uncomment to reset the config for testing purposes - bool restoreOk = configRestore(); - // init gpio pin registers - gpio_init(); - // init UART - uart_init(flashConfig.baud_rate, 115200); - logInit(); // must come after init of uart - // say hello (leave some time to cause break in TX after boot loader's msg - os_delay_us(10000L); - os_printf("\n\n** %s\n", esp_link_version); - os_printf("Flash config restore %s\n", restoreOk ? "ok" : "*FAILED*"); - // Status LEDs - statusInit(); - serledInit(); - // Wifi - wifiInit(); - // init the flash filesystem with the html stuff - espFsInit(&_binary_espfs_img_start); - //EspFsInitResult res = espFsInit(&_binary_espfs_img_start); - //os_printf("espFsInit %s\n", res?"ERR":"ok"); - // mount the http handlers - httpdInit(builtInUrls, 80); - // init the wifi-serial transparent bridge (port 23) - serbridgeInit(23); - uart_add_recv_cb(&serbridgeUartCb); -#ifdef SHOW_HEAP_USE - os_timer_disarm(&prHeapTimer); - os_timer_setfn(&prHeapTimer, prHeapTimerCb, NULL); - os_timer_arm(&prHeapTimer, 10000, 1); -#endif + os_memcpy(dataBuf, data, data_len); + dataBuf[data_len] = 0; - struct rst_info *rst_info = system_get_rst_info(); - os_printf("Reset cause: %d=%s\n", rst_info->reason, rst_codes[rst_info->reason]); - os_printf("exccause=%d epc1=0x%x epc2=0x%x epc3=0x%x excvaddr=0x%x depc=0x%x\n", - rst_info->exccause, rst_info->epc1, rst_info->epc2, rst_info->epc3, - rst_info->excvaddr, rst_info->depc); - os_printf("Flash map %d, chip %08X\n", system_get_flash_size_map(), spi_flash_get_id()); + os_printf("Receive topic: %s, data: %s\n", topicBuf, dataBuf); + os_free(topicBuf); + os_free(dataBuf); +} - os_printf("** esp-link ready\n"); +void ICACHE_FLASH_ATTR +wifiStateChangeCb(uint8_t status) +{ + if (status == wifiGotIP && mqttClient.connState != TCP_CONNECTING){ + MQTT_Connect(&mqttClient); + } + else if (status == wifiIsDisconnected && mqttClient.connState == TCP_CONNECTING){ + MQTT_Disconnect(&mqttClient); + } } + +void init() { + wifiAddStateChangeCb(wifiStateChangeCb); + MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, MQTT_SECURITY); + MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, MQTT_CLSESSION); + MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0); + MQTT_OnConnected(&mqttClient, mqttConnectedCb); + MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb); + MQTT_OnDisconnected(&mqttClient, mqttTcpDisconnectedCb); + MQTT_OnPublished(&mqttClient, mqttPublishedCb); + MQTT_OnData(&mqttClient, mqttDataCb); +} \ No newline at end of file