From 36303a2b67dd6fe78519b54719c8f015717593c6 Mon Sep 17 00:00:00 2001 From: Benjamin Runnels Date: Sat, 29 Aug 2015 15:48:51 -0500 Subject: [PATCH] finished mqtt --- cmd/handlers.c | 6 +- cmd/mqtt_cmd.c | 30 ++++----- cmd/mqtt_cmd.h | 12 ++-- cmd/rest.c | 2 - cmd/rest.h | 1 + esp-link.vcxproj | 4 +- esp-link/cgiwifi.h | 4 +- esp-link/main.c | 2 +- httpd/httpdespfs.c | 11 ++-- httpd/httpdespfs.h | 6 +- include/esp8266.h | 1 + include/user_config.h | 42 ++++--------- mqtt/mqtt.c | 32 +++++----- mqtt/mqtt.h | 28 ++++----- mqtt/mqtt_msg.h | 40 ++++++------ mqtt/proto.h | 15 ++--- mqtt/queue.h | 8 +-- mqtt/ringbuf.h | 6 +- mqtt/utils.c | 140 ------------------------------------------ mqtt/utils.h | 9 --- user/user_main.c | 67 +++++++++++++++++++- 21 files changed, 181 insertions(+), 285 deletions(-) delete mode 100644 mqtt/utils.c delete mode 100644 mqtt/utils.h diff --git a/cmd/handlers.c b/cmd/handlers.c index ae21bec..d5359cb 100644 --- a/cmd/handlers.c +++ b/cmd/handlers.c @@ -17,6 +17,7 @@ static uint32_t ICACHE_FLASH_ATTR CMD_WifiConnect(CmdPacket *cmd); static uint32_t ICACHE_FLASH_ATTR CMD_AddSensor(CmdPacket *cmd); static uint8_t lastWifiStatus = wifiIsDisconnected; +static bool wifiCbAdded = false; // Command dispatch table for serial -> ESP commands const CmdList commands[] = { @@ -127,7 +128,10 @@ CMD_WifiConnect(CmdPacket *cmd) { if(cmd->argc != 2 || cmd->callback == 0) return 0; - wifiStatusCb = CMD_WifiCb; // register our callback with wifi subsystem + if (!wifiCbAdded) { + wifiAddStateChangeCb(CMD_WifiCb); // register our callback with wifi subsystem + wifiCbAdded = true; + } CMD_AddCb("wifiCb", (uint32_t)cmd->callback); // save the MCU's callback lastWifiStatus = wifiIsDisconnected; CMD_WifiCb(wifiState); diff --git a/cmd/mqtt_cmd.c b/cmd/mqtt_cmd.c index d12ba21..358ad48 100644 --- a/cmd/mqtt_cmd.c +++ b/cmd/mqtt_cmd.c @@ -1,9 +1,9 @@ #include "mqtt_cmd.h" -uint32_t connectedCb = 0, disconnectCb = 0, publishedCb = 0, dataCb = 0; +uint32_t connectedCb = 0, disconnectCb = 0, tcpDisconnectedCb = 0, publishedCb = 0, dataCb = 0; void ICACHE_FLASH_ATTR -mqttConnectedCb(uint32_t* args) { +cmdMqttConnectedCb(uint32_t* args) { MQTT_Client* client = (MQTT_Client*)args; MqttCmdCb* cb = (MqttCmdCb*)client->user_data; os_printf("MQTT: Connected connectedCb=%p, disconnectedCb=%p, publishedCb=%p, dataCb=%p\n", @@ -16,7 +16,7 @@ mqttConnectedCb(uint32_t* args) { } void ICACHE_FLASH_ATTR -mqttTcpDisconnectedCb(uint32_t *args) { +cmdMqttTcpDisconnectedCb(uint32_t *args) { MQTT_Client* client = (MQTT_Client*)args; MqttCmdCb *cb = (MqttCmdCb*)client->user_data; os_printf("MQTT: TCP Disconnected\n"); @@ -25,7 +25,7 @@ mqttTcpDisconnectedCb(uint32_t *args) { } void ICACHE_FLASH_ATTR -mqttDisconnectedCb(uint32_t* args) { +cmdMqttDisconnectedCb(uint32_t* args) { MQTT_Client* client = (MQTT_Client*)args; MqttCmdCb* cb = (MqttCmdCb*)client->user_data; os_printf("MQTT: Disconnected\n"); @@ -34,7 +34,7 @@ mqttDisconnectedCb(uint32_t* args) { } void ICACHE_FLASH_ATTR -mqttPublishedCb(uint32_t* args) { +cmdMqttPublishedCb(uint32_t* args) { MQTT_Client* client = (MQTT_Client*)args; MqttCmdCb* cb = (MqttCmdCb*)client->user_data; os_printf("MQTT: Published\n"); @@ -43,7 +43,7 @@ mqttPublishedCb(uint32_t* args) { } void ICACHE_FLASH_ATTR -mqttDataCb(uint32_t* args, const char* topic, uint32_t topic_len, const char* data, uint32_t data_len) { +cmdMqttDataCb(uint32_t* args, const char* topic, uint32_t topic_len, const char* data, uint32_t data_len) { uint16_t crc = 0; MQTT_Client* client = (MQTT_Client*)args; MqttCmdCb* cb = (MqttCmdCb*)client->user_data; @@ -104,7 +104,7 @@ MQTTCMD_Setup(CmdPacket *cmd) { // init client // TODO: why malloc these all here, pass to MQTT_InitClient to be malloc'd again? - MQTT_InitClient(client, client_id, user_data, pass_data, keepalive, clean_session); + MQTT_InitClient(client, (char*)client_id, (char*)user_data, (char*)pass_data, keepalive, clean_session); // create callback MqttCmdCb* callback = (MqttCmdCb*)os_zalloc(sizeof(MqttCmdCb)); @@ -120,15 +120,15 @@ MQTTCMD_Setup(CmdPacket *cmd) { client->user_data = callback; - client->cmdConnectedCb = mqttConnectedCb; - client->cmdDisconnectedCb = mqttDisconnectedCb; - client->cmdPublishedCb = mqttPublishedCb; - client->cmdDataCb = mqttDataCb; + client->cmdConnectedCb = cmdMqttConnectedCb; + client->cmdDisconnectedCb = cmdMqttDisconnectedCb; + client->cmdPublishedCb = cmdMqttPublishedCb; + client->cmdDataCb = cmdMqttDataCb; if (CMD_GetArgc(&req) == 10) { CMD_PopArg(&req, (uint8_t*)&cb_data, 4); callback->tcpDisconnectedCb = cb_data; - client->cmdTcpDisconnectedCb = mqttTcpDisconnectedCb; + client->cmdTcpDisconnectedCb = cmdMqttTcpDisconnectedCb; } os_free(client_id); @@ -178,7 +178,7 @@ MQTTCMD_Lwt(CmdPacket *cmd) { // get retain CMD_PopArg(&req, (uint8_t*)&client->connect_info.will_retain, 4); - os_printf("MQTT: MQTTCMD_Lwt topic=%s, message=%s, qos=%ld, retain=%ld\n", + os_printf("MQTT: MQTTCMD_Lwt topic=%s, message=%s, qos=%d, retain=%d\n", client->connect_info.will_topic, client->connect_info.will_message, client->connect_info.will_qos, @@ -207,7 +207,7 @@ MQTTCMD_Connect(CmdPacket *cmd) { os_free(client->host); len = CMD_ArgLen(&req); if (len > 128) return 0; // safety check - client->host = (uint8_t*)os_zalloc(len + 1); + client->host = (char*)os_zalloc(len + 1); CMD_PopArg(&req, client->host, len); client->host[len] = 0; @@ -217,7 +217,7 @@ MQTTCMD_Connect(CmdPacket *cmd) { // get security CMD_PopArg(&req, (uint8_t*)&client->security, 4); - os_printf("MQTT: MQTTCMD_Connect host=%s, port=%ld, security=%ld\n", + os_printf("MQTT: MQTTCMD_Connect host=%s, port=%ld, security=%d\n", client->host, client->port, client->security); diff --git a/cmd/mqtt_cmd.h b/cmd/mqtt_cmd.h index 66fd8f6..9ccbd08 100644 --- a/cmd/mqtt_cmd.h +++ b/cmd/mqtt_cmd.h @@ -12,11 +12,11 @@ typedef struct { uint32_t tcpDisconnectedCb; } MqttCmdCb; -uint32_t ICACHE_FLASH_ATTR MQTTCMD_Connect(CmdPacket *cmd); -uint32_t ICACHE_FLASH_ATTR MQTTCMD_Disconnect(CmdPacket *cmd); -uint32_t ICACHE_FLASH_ATTR MQTTCMD_Setup(CmdPacket *cmd); -uint32_t ICACHE_FLASH_ATTR MQTTCMD_Publish(CmdPacket *cmd); -uint32_t ICACHE_FLASH_ATTR MQTTCMD_Subscribe(CmdPacket *cmd); -uint32_t ICACHE_FLASH_ATTR MQTTCMD_Lwt(CmdPacket *cmd); +uint32_t MQTTCMD_Connect(CmdPacket *cmd); +uint32_t MQTTCMD_Disconnect(CmdPacket *cmd); +uint32_t MQTTCMD_Setup(CmdPacket *cmd); +uint32_t MQTTCMD_Publish(CmdPacket *cmd); +uint32_t MQTTCMD_Subscribe(CmdPacket *cmd); +uint32_t MQTTCMD_Lwt(CmdPacket *cmd); #endif /* MODULES_MQTT_CMD_H_ */ diff --git a/cmd/rest.c b/cmd/rest.c index 75b5b0b..3ae6575 100644 --- a/cmd/rest.c +++ b/cmd/rest.c @@ -16,8 +16,6 @@ static RestClient restClient[MAX_REST]; static uint8_t restNum = 0xff; // index into restClient for next slot to allocate #define REST_CB 0xbeef0000 // fudge added to callback for arduino so we can detect problems -extern uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const char* str, void *ip); - static void ICACHE_FLASH_ATTR tcpclient_discon_cb(void *arg) { struct espconn *pespconn = (struct espconn *)arg; diff --git a/cmd/rest.h b/cmd/rest.h index 7161e1f..9c6cd5c 100644 --- a/cmd/rest.h +++ b/cmd/rest.h @@ -36,5 +36,6 @@ typedef struct { uint32_t REST_Setup(CmdPacket *cmd); uint32_t REST_Request(CmdPacket *cmd); uint32_t REST_SetHeader(CmdPacket *cmd); +uint8_t UTILS_StrToIP(const char* str, void *ip); #endif /* MODULES_INCLUDE_API_H_ */ diff --git a/esp-link.vcxproj b/esp-link.vcxproj index 760a8ef..f371f1a 100644 --- a/esp-link.vcxproj +++ b/esp-link.vcxproj @@ -28,7 +28,7 @@ __ets__;_STDINT_H;ICACHE_FLASH;__MINGW32__;__WIN32__ - .\mqtt;.\cmd;.\serial;.\user;.\espfs;.\httpd;.\include;..\esp_iot_sdk_v1.3.0\include;..\xtensa-lx106-elf\xtensa-lx106-elf\include;c:\tools\mingw64\x86_64-w64-mingw32\include;c:\tools\mingw64\lib\gcc\x86_64-w64-mingw32\4.8.3\include + .\esp-link;.\mqtt;.\cmd;.\serial;.\user;.\espfs;.\httpd;.\include;..\esp_iot_sdk_v1.3.0\include;..\xtensa-lx106-elf\xtensa-lx106-elf\include;c:\tools\mingw64\x86_64-w64-mingw32\include;c:\tools\mingw64\lib\gcc\x86_64-w64-mingw32\4.8.3\include @@ -81,7 +81,6 @@ - @@ -128,7 +127,6 @@ - diff --git a/esp-link/cgiwifi.h b/esp-link/cgiwifi.h index d73628e..3f7fe09 100644 --- a/esp-link/cgiwifi.h +++ b/esp-link/cgiwifi.h @@ -4,6 +4,7 @@ #include "httpd.h" enum { wifiIsDisconnected, wifiIsConnected, wifiGotIP }; +typedef void(*WifiStateChangeCb)(uint8_t wifiStatus); int cgiWiFiScan(HttpdConnData *connData); int cgiWifiInfo(HttpdConnData *connData); @@ -13,8 +14,9 @@ int cgiWiFiSetMode(HttpdConnData *connData); int cgiWiFiConnStatus(HttpdConnData *connData); int cgiWiFiSpecial(HttpdConnData *connData); void wifiInit(void); +void wifiAddStateChangeCb(WifiStateChangeCb cb); extern uint8_t wifiState; -extern void (*wifiStatusCb)(uint8_t); // callback when wifi status changes +//extern void (*wifiStatusCb)(uint8_t); // callback when wifi status changes #endif diff --git a/esp-link/main.c b/esp-link/main.c index 417a1b2..5febf87 100644 --- a/esp-link/main.c +++ b/esp-link/main.c @@ -118,7 +118,7 @@ static char *rst_codes[] = { # define VERS_STR_STR(V) #V # define VERS_STR(V) VERS_STR_STR(V) -char *esp_link_version = VERS_STR(VERSION); +char* esp_link_version = VERS_STR(VERSION); //Main routine. Initialize stdout, the I/O, filesystem and the webserver and we're done. void user_init(void) { diff --git a/httpd/httpdespfs.c b/httpd/httpdespfs.c index b6853aa..c6f2c0c 100644 --- a/httpd/httpdespfs.c +++ b/httpd/httpdespfs.c @@ -12,12 +12,7 @@ Connector to let httpd use the espfs filesystem to serve the files in it. * Modified and enhanced by Thorsten von Eicken in 2015 * ---------------------------------------------------------------------------- */ - -#include #include "httpdespfs.h" -#include "espfs.h" -#include "espfsformat.h" -#include "cgi.h" // The static files marked with FLAG_GZIP are compressed and will be served with GZIP compression. // If the client does not advertise that he accepts GZIP send following warning message (telnet users for e.g.) @@ -27,7 +22,8 @@ static const char *gzipNonSupportedMessage = "HTTP/1.0 501 Not implemented\r\nSe //This is a catch-all cgi function. It takes the url passed to it, looks up the corresponding //path in the filesystem and if it exists, passes the file through. This simulates what a normal //webserver would do with static files. -int ICACHE_FLASH_ATTR cgiEspFsHook(HttpdConnData *connData) { +int ICACHE_FLASH_ATTR +cgiEspFsHook(HttpdConnData *connData) { EspFsFile *file=connData->cgiData; int len; char buff[1024]; @@ -93,7 +89,8 @@ int ICACHE_FLASH_ATTR cgiEspFsHook(HttpdConnData *connData) { #if 0 //cgiEspFsHtml is a simple HTML file that gets prefixed by head.tpl -int ICACHE_FLASH_ATTR cgiEspFsHtml(HttpdConnData *connData) { +int ICACHE_FLASH_ATTR +cgiEspFsHtml(HttpdConnData *connData) { EspFsFile *file = connData->cgiData; char buff[2048]; diff --git a/httpd/httpdespfs.h b/httpd/httpdespfs.h index fb07008..847a8b6 100644 --- a/httpd/httpdespfs.h +++ b/httpd/httpdespfs.h @@ -1,10 +1,14 @@ #ifndef HTTPDESPFS_H #define HTTPDESPFS_H +#include +#include "espfs.h" +#include "espfsformat.h" +#include "cgi.h" #include "httpd.h" int cgiEspFsHook(HttpdConnData *connData); -int ICACHE_FLASH_ATTR cgiEspFsTemplate(HttpdConnData *connData); +//int cgiEspFsTemplate(HttpdConnData *connData); //int ICACHE_FLASH_ATTR cgiEspFsHtml(HttpdConnData *connData); #endif diff --git a/include/esp8266.h b/include/esp8266.h index d1f5cc4..c3d0c40 100644 --- a/include/esp8266.h +++ b/include/esp8266.h @@ -16,6 +16,7 @@ #include "espmissingincludes.h" #include "uart_hw.h" +extern char* esp_link_version; void ICACHE_FLASH_ATTR init(void); diff --git a/include/user_config.h b/include/user_config.h index 16f7f9c..7bdfd38 100644 --- a/include/user_config.h +++ b/include/user_config.h @@ -1,40 +1,20 @@ #ifndef _USER_CONFIG_H_ #define _USER_CONFIG_H_ -/*DEFAULT CONFIGURATIONS*/ - -#define MQTT_HOST "mqtt.yourdomain.com" //or "mqtt.yourdomain.com" -#define MQTT_PORT 1883 +#define MQTT_RECONNECT_TIMEOUT 5 // seconds #define MQTT_BUF_SIZE 1024 -#define MQTT_KEEPALIVE 120 /*second*/ - -#define MQTT_CLIENT_ID "H_%08X" //Cuidar para não colocar valores execendentes da ESTRUTURA SYSCFG -#define MQTT_USER "DVES_USER" -#define MQTT_PASS "DVES_PASS" - -#define STA_SSID "TESTE" -#define STA_PASS "54545" -#define STA_TYPE AUTH_WPA2_PSK -#define MQTT_RECONNECT_TIMEOUT 5 /*second*/ - -#define DEFAULT_SECURITY 0 -#define QUEUE_BUFFER_SIZE 2048 - -//#undef MCU_RESET_PIN -//#undef MCU_ISP_PIN -//#undef LED_CONN_PIN -//#undef LED_SERIAL_PIN -// -//#define MCU_RESET_PIN 2 -//#define MCU_ISP_PIN -1 -//#define LED_CONN_PIN -1 -//#define LED_SERIAL_PIN -1 +#define MQTT_HOST "10.0.0.220" // "mqtt.yourdomain.com" or ip "10.0.0.1" +#define MQTT_PORT 1883 +#define MQTT_SECURITY 0 -//#define BAUD_RATE 9600 -//#define HOSTNAME "nodemcu\0 " +#define MQTT_CLIENT_ID "esp-link" // "" +#define MQTT_USER "" +#define MQTT_PASS "" +#define MQTT_KEEPALIVE 120 // seconds +#define MQTT_CLSESSION true -#define PROTOCOL_NAMEv31 /*MQTT version 3.1 compatible with Mosquitto v0.15*/ -//PROTOCOL_NAMEv311 /*MQTT version 3.11 compatible with https://eclipse.org/paho/clients/testing/*/ +#define PROTOCOL_NAMEv31 // MQTT version 3.1 compatible with Mosquitto v0.15 +//PROTOCOL_NAMEv311 // MQTT version 3.11 compatible with https://eclipse.org/paho/clients/testing/ #endif \ No newline at end of file diff --git a/mqtt/mqtt.c b/mqtt/mqtt.c index 9d435c8..12b9514 100644 --- a/mqtt/mqtt.c +++ b/mqtt/mqtt.c @@ -471,14 +471,16 @@ MQTT_Task(os_event_t* e) { * @retval None */ void ICACHE_FLASH_ATTR -MQTT_InitConnection(MQTT_Client* mqttClient, uint8_t* host, uint32 port, uint8_t security) { - uint32_t temp; +MQTT_InitConnection(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security) { os_printf("MQTT_InitConnection\n"); - os_memset(mqttClient, 0, sizeof(MQTT_Client)); - temp = sizeof((char*)host); - mqttClient->host = (uint8_t*)os_zalloc(temp + 1); - os_strcpy((char*)mqttClient->host, (char*)host); + uint8_t len = sizeof(MQTT_Client); + os_memset(mqttClient, 0, len); + + uint32_t temp = os_strlen(host); + mqttClient->host = (char*)os_zalloc(temp + 1); + os_strcpy(mqttClient->host, host); mqttClient->host[temp] = 0; + mqttClient->port = port; mqttClient->security = security; } @@ -493,25 +495,25 @@ MQTT_InitConnection(MQTT_Client* mqttClient, uint8_t* host, uint32 port, uint8_t * @retval None */ void ICACHE_FLASH_ATTR -MQTT_InitClient(MQTT_Client* mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint8_t keepAliveTime, uint8_t cleanSession) { +MQTT_InitClient(MQTT_Client* mqttClient, char* client_id, char* client_user, char* client_pass, uint8_t keepAliveTime, uint8_t cleanSession) { uint32_t temp; os_printf("MQTT_InitClient\n"); os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t)); - temp = os_strlen((char*)client_id); + temp = os_strlen(client_id); mqttClient->connect_info.client_id = (char*)os_zalloc(temp + 1); - os_strcpy((char*)mqttClient->connect_info.client_id, (char*)client_id); + os_strcpy(mqttClient->connect_info.client_id, client_id); mqttClient->connect_info.client_id[temp] = 0; - temp = os_strlen((char*)client_user); + temp = os_strlen(client_user); mqttClient->connect_info.username = (char*)os_zalloc(temp + 1); - os_strcpy((char*)mqttClient->connect_info.username, (char*)client_user); + os_strcpy(mqttClient->connect_info.username, client_user); mqttClient->connect_info.username[temp] = 0; - temp = os_strlen((char*)client_pass); + temp = os_strlen(client_pass); mqttClient->connect_info.password = (char*)os_zalloc(temp + 1); - os_strcpy((char*)mqttClient->connect_info.password, (char*)client_pass); + os_strcpy(mqttClient->connect_info.password, client_pass); mqttClient->connect_info.password[temp] = 0; @@ -533,7 +535,7 @@ MQTT_InitClient(MQTT_Client* mqttClient, uint8_t* client_id, uint8_t* client_use } void ICACHE_FLASH_ATTR -MQTT_InitLWT(MQTT_Client* mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain) { +MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, uint8_t will_qos, uint8_t will_retain) { uint32_t temp; temp = os_strlen((char*)will_topic); mqttClient->connect_info.will_topic = (char*)os_zalloc(temp + 1); @@ -576,7 +578,7 @@ MQTT_Connect(MQTT_Client* mqttClient) { os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient); os_timer_arm(&mqttClient->mqttTimer, 1000, 1); - if (UTILS_StrToIP((const int8_t *)mqttClient->host, (void*)&mqttClient->pCon->proto.tcp->remote_ip)) { + if (UTILS_StrToIP((const char *)mqttClient->host, (void*)&mqttClient->pCon->proto.tcp->remote_ip)) { os_printf("MQTT-TCP: Connect to ip %s:%ld\n", mqttClient->host, mqttClient->port); #ifdef CLIENT_SSL_ENABLE if (mqttClient->security){ diff --git a/mqtt/mqtt.h b/mqtt/mqtt.h index d0fdfb5..9ad6825 100644 --- a/mqtt/mqtt.h +++ b/mqtt/mqtt.h @@ -33,7 +33,7 @@ #include #include "mqtt_msg.h" #include "queue.h" -#include "utils.h" +#include typedef struct mqtt_event_data_t { uint8_t type; @@ -87,8 +87,8 @@ typedef void (*MqttDataCallback)(uint32_t* args, const char* topic, uint32_t top typedef struct { struct espconn* pCon; - uint32_t security; - uint8_t* host; + uint8_t security; + char* host; uint32_t port; ip_addr_t ip; mqtt_state_t mqtt_state; @@ -129,16 +129,16 @@ typedef struct { #define MQTT_EVENT_TYPE_EXITED 7 #define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION 8 -void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client* mqttClient, uint8_t* host, uint32 port, uint8_t security); -void ICACHE_FLASH_ATTR MQTT_InitClient(MQTT_Client* mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint8_t keepAliveTime, uint8_t cleanSession); -void ICACHE_FLASH_ATTR MQTT_InitLWT(MQTT_Client* mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain); -void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb); -void ICACHE_FLASH_ATTR MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb); -void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb); -void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb); -bool ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos); -void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client* mqttClient); -void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client* mqttClient); -bool ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain); +void MQTT_InitConnection(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security); +void MQTT_InitClient(MQTT_Client* mqttClient, char* client_id, char* client_user, char* client_pass, uint8_t keepAliveTime, uint8_t cleanSession); +void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, uint8_t will_qos, uint8_t will_retain); +void MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb); +void MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb); +void MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb); +void MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb); +bool MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos); +void MQTT_Connect(MQTT_Client* mqttClient); +void MQTT_Disconnect(MQTT_Client* mqttClient); +bool MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain); #endif /* USER_AT_MQTT_H_ */ diff --git a/mqtt/mqtt_msg.h b/mqtt/mqtt_msg.h index ce840b7..bf305df 100644 --- a/mqtt/mqtt_msg.h +++ b/mqtt/mqtt_msg.h @@ -72,9 +72,9 @@ typedef struct mqtt_connect_info { char* will_topic; char* will_message; uint32_t keepalive; - uint32_t will_qos; - uint32_t will_retain; - uint32_t clean_session; + uint8_t will_qos; + uint8_t will_retain; + uint8_t clean_session; } mqtt_connect_info_t; @@ -95,23 +95,23 @@ static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } -void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); -int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length); -const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); -const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); -uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length); - -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection); -mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection); +void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); +int mqtt_get_total_length(uint8_t* buffer, uint16_t length); +const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); +const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); +uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); + +mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); +mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); +mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); +mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); +mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection); #endif // MQTT_MSG_H diff --git a/mqtt/proto.h b/mqtt/proto.h index 8d20832..67b3e26 100644 --- a/mqtt/proto.h +++ b/mqtt/proto.h @@ -1,10 +1,3 @@ -/* - * File: proto.h - * Author: ThuHien - * - * Created on November 23, 2012, 8:57 AM - */ - #ifndef _PROTO_H_ #define _PROTO_H_ #include @@ -21,8 +14,8 @@ typedef struct { PROTO_PARSE_CALLBACK* callback; } PROTO_PARSER; -int8_t ICACHE_FLASH_ATTR PROTO_Init(PROTO_PARSER* parser, PROTO_PARSE_CALLBACK* completeCallback, uint8_t* buf, uint16_t bufSize); -int16_t ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF* rb, const uint8_t* packet, int16_t len); -int8_t ICACHE_FLASH_ATTR PROTO_ParseByte(PROTO_PARSER* parser, uint8_t value); -int16_t ICACHE_FLASH_ATTR PROTO_ParseRb(RINGBUF* rb, uint8_t* bufOut, uint16_t* len, uint16_t maxBufLen); +int8_t PROTO_Init(PROTO_PARSER* parser, PROTO_PARSE_CALLBACK* completeCallback, uint8_t* buf, uint16_t bufSize); +int16_t PROTO_AddRb(RINGBUF* rb, const uint8_t* packet, int16_t len); +int8_t PROTO_ParseByte(PROTO_PARSER* parser, uint8_t value); +int16_t PROTO_ParseRb(RINGBUF* rb, uint8_t* bufOut, uint16_t* len, uint16_t maxBufLen); #endif diff --git a/mqtt/queue.h b/mqtt/queue.h index 78b7882..bdbc503 100644 --- a/mqtt/queue.h +++ b/mqtt/queue.h @@ -39,8 +39,8 @@ typedef struct { RINGBUF rb; } QUEUE; -void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE* queue, int bufferSize); -int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE* queue, uint8_t* buffer, uint16_t len); -int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE* queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen); -bool ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE* queue); +void QUEUE_Init(QUEUE* queue, int bufferSize); +int32_t QUEUE_Puts(QUEUE* queue, uint8_t* buffer, uint16_t len); +int32_t QUEUE_Gets(QUEUE* queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen); +bool QUEUE_IsEmpty(QUEUE* queue); #endif /* USER_QUEUE_H_ */ diff --git a/mqtt/ringbuf.h b/mqtt/ringbuf.h index 7d9f8e9..d504d53 100644 --- a/mqtt/ringbuf.h +++ b/mqtt/ringbuf.h @@ -11,7 +11,7 @@ typedef struct { int32_t size; /**< Buffer size */ } RINGBUF; -int16_t ICACHE_FLASH_ATTR RINGBUF_Init(RINGBUF* r, uint8_t* buf, int32_t size); -int16_t ICACHE_FLASH_ATTR RINGBUF_Put(RINGBUF* r, uint8_t c); -int16_t ICACHE_FLASH_ATTR RINGBUF_Get(RINGBUF* r, uint8_t* c); +int16_t RINGBUF_Init(RINGBUF* r, uint8_t* buf, int32_t size); +int16_t RINGBUF_Put(RINGBUF* r, uint8_t c); +int16_t RINGBUF_Get(RINGBUF* r, uint8_t* c); #endif diff --git a/mqtt/utils.c b/mqtt/utils.c deleted file mode 100644 index 86aff20..0000000 --- a/mqtt/utils.c +++ /dev/null @@ -1,140 +0,0 @@ -/* -* Copyright (c) 2014, Tuan PM -* Email: tuanpm@live.com -* -* All rights reserved. -* -* Redistribution and use in source and binary forms, with or without -* modification, are permitted provided that the following conditions -* are met: -* -* 1. Redistributions of source code must retain the above copyright -* notice, this list of conditions and the following disclaimer. -* 2. Redistributions in binary form must reproduce the above copyright -* notice, this list of conditions and the following disclaimer in the -* documentation and/or other materials provided with the distribution. -* 3. Neither the name of the copyright holder nor the names of its -* contributors may be used to endorse or promote products derived -* from this software without specific prior written permission. -* -* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -* POSSIBILITY OF SUCH DAMAGE. -* -*/ -#include "utils.h" - -uint8_t ICACHE_FLASH_ATTR -UTILS_IsIPV4(int8_t* str) { - uint8_t segs = 0; /* Segment count. */ - uint8_t chcnt = 0; /* Character count within segment. */ - uint8_t accum = 0; /* Accumulator for segment. */ - /* Catch NULL pointer. */ - if (str == 0) - return 0; - /* Process every character in string. */ - - while (*str != '\0') { - /* Segment changeover. */ - - if (*str == '.') { - /* Must have some digits in segment. */ - if (chcnt == 0) - return 0; - /* Limit number of segments. */ - if (++segs == 4) - return 0; - /* Reset segment values and restart loop. */ - chcnt = accum = 0; - str++; - continue; - } - - /* Check numeric. */ - if ((*str < '0') || (*str > '9')) - return 0; - - /* Accumulate and check segment. */ - - if ((accum = accum * 10 + *str - '0') > 255) - return 0; - /* Advance other segment specific stuff and continue loop. */ - - chcnt++; - str++; - } - - /* Check enough segments and enough characters in last segment. */ - - if (segs != 3) - return 0; - if (chcnt == 0) - return 0; - /* Address okay. */ - - return 1; -} - -uint8_t ICACHE_FLASH_ATTR -UTILS_StrToIP(const int8_t* str, void* ip) { - - /* The count of the number of bytes processed. */ - int i; - /* A pointer to the next digit to process. */ - for (i = 0; i < 4; i++) { - /* The digit being processed. */ - char c; - /* The value of this byte. */ - int n = 0; - while (1) { - c = *(const char *)str; - (const char *)str++; - if (c >= '0' && c <= '9') { - n *= 10; - n += c - '0'; - } - /* We insist on stopping at "." if we are still parsing - the first, second, or third numbers. If we have reached - the end of the numbers, we will allow any character. */ - else if ((i < 3 && c == '.') || i == 3) { - break; - } - else { - return 0; - } - } - if (n >= 256) { - return 0; - } - ((uint8_t*)ip)[i] = n; - } - return 1; -} - -uint32_t ICACHE_FLASH_ATTR -UTILS_Atoh(const int8_t* s) { - uint32_t value = 0, digit; - int8_t c; - - while ((c = *s++)) { - if ('0' <= c && c <= '9') - digit = c - '0'; - else if ('A' <= c && c <= 'F') - digit = c - 'A' + 10; - else if ('a' <= c && c <= 'f') - digit = c - 'a' + 10; - else break; - - value = (value << 4) | digit; - } - - return value; -} diff --git a/mqtt/utils.h b/mqtt/utils.h deleted file mode 100644 index bc4d2af..0000000 --- a/mqtt/utils.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef _UTILS_H_ -#define _UTILS_H_ - -#include - -uint32_t ICACHE_FLASH_ATTR UTILS_Atoh(const int8_t* s); -uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t* str, void* ip); -uint8_t ICACHE_FLASH_ATTR UTILS_IsIPV4(int8_t* str); -#endif diff --git a/user/user_main.c b/user/user_main.c index 31e62a1..5738807 100644 --- a/user/user_main.c +++ b/user/user_main.c @@ -1,5 +1,70 @@ #include +#include +#include + +MQTT_Client mqttClient; + +void ICACHE_FLASH_ATTR +mqttConnectedCb(uint32_t *args) { + MQTT_Client* client = (MQTT_Client*)args; + MQTT_Publish(client, "announce/all", "Hello World!", 0, 0); +} + +void ICACHE_FLASH_ATTR +mqttDisconnectedCb(uint32_t *args) { +// MQTT_Client* client = (MQTT_Client*)args; + os_printf("MQTT Disconnected\n"); +} + +void ICACHE_FLASH_ATTR +mqttTcpDisconnectedCb(uint32_t *args) { +// MQTT_Client* client = (MQTT_Client*)args; + os_printf("MQTT TCP Disconnected\n"); +} + +void ICACHE_FLASH_ATTR +mqttPublishedCb(uint32_t *args) { +// MQTT_Client* client = (MQTT_Client*)args; + os_printf("MQTT Published\n"); +} + +void ICACHE_FLASH_ATTR +mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len) { + char *topicBuf = (char*)os_zalloc(topic_len + 1); + char *dataBuf = (char*)os_zalloc(data_len + 1); + +// MQTT_Client* client = (MQTT_Client*)args; + + os_memcpy(topicBuf, topic, topic_len); + topicBuf[topic_len] = 0; + + os_memcpy(dataBuf, data, data_len); + dataBuf[data_len] = 0; + + os_printf("Receive topic: %s, data: %s\n", topicBuf, dataBuf); + os_free(topicBuf); + os_free(dataBuf); +} + +void ICACHE_FLASH_ATTR +wifiStateChangeCb(uint8_t status) +{ + if (status == wifiGotIP && mqttClient.connState != TCP_CONNECTING){ + MQTT_Connect(&mqttClient); + } + else if (status == wifiIsDisconnected && mqttClient.connState == TCP_CONNECTING){ + MQTT_Disconnect(&mqttClient); + } +} void init() { - + wifiAddStateChangeCb(wifiStateChangeCb); + MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, MQTT_SECURITY); + MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, MQTT_CLSESSION); + MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0); + MQTT_OnConnected(&mqttClient, mqttConnectedCb); + MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb); + MQTT_OnDisconnected(&mqttClient, mqttTcpDisconnectedCb); + MQTT_OnPublished(&mqttClient, mqttPublishedCb); + MQTT_OnData(&mqttClient, mqttDataCb); } \ No newline at end of file