massive rework of MQTT

pull/47/head
Thorsten von Eicken 9 years ago
parent da7a1b913f
commit 9e4cbb0dbd
  1. 2
      Makefile
  2. 2
      cmd/handlers.c
  3. 11
      cmd/mqtt_cmd.c
  4. 1
      cmd/mqtt_cmd.h
  5. 12
      cmd/rest.c
  6. 46
      esp-link/cgipins.c
  7. 10
      esp-link/main.c
  8. 176
      esp-link/status.c
  9. 2
      html/console.html
  10. 2
      httpd/httpd.c
  11. 16
      include/user_config.h
  12. 924
      mqtt/mqtt.c
  13. 171
      mqtt/mqtt.h
  14. 9
      mqtt/mqtt_msg.c
  15. 48
      mqtt/mqtt_msg.h
  16. 64
      mqtt/pktbuf.c
  17. 29
      mqtt/pktbuf.h
  18. 86
      mqtt/proto.c
  19. 21
      mqtt/proto.h
  20. 53
      mqtt/queue.c
  21. 46
      mqtt/queue.h
  22. 63
      mqtt/ringbuf.c
  23. 17
      mqtt/ringbuf.h
  24. 1
      serial/serbridge.c
  25. 1
      serial/serled.h
  26. 2
      serial/uart.c
  27. 49
      user/user_main.c

@ -147,7 +147,7 @@ MODULES = espfs httpd user serial cmd mqtt esp-link
EXTRA_INCDIR = include . EXTRA_INCDIR = include .
# libraries used in this project, mainly provided by the SDK # libraries used in this project, mainly provided by the SDK
LIBS = c gcc hal phy pp net80211 wpa main lwip LIBS = c gcc hal phy pp net80211 wpa main lwip # crypto ssl
# compiler flags using during compilation of source files # compiler flags using during compilation of source files
CFLAGS = -Os -ggdb -std=c99 -Werror -Wpointer-arith -Wundef -Wall -Wl,-EL -fno-inline-functions \ CFLAGS = -Os -ggdb -std=c99 -Werror -Wpointer-arith -Wundef -Wall -Wl,-EL -fno-inline-functions \

@ -95,7 +95,7 @@ cmdCallback* ICACHE_FLASH_ATTR
CMD_GetCbByName(char* name) { CMD_GetCbByName(char* name) {
char checkname[16]; char checkname[16];
os_strncpy(checkname, name, sizeof(checkname)); os_strncpy(checkname, name, sizeof(checkname));
for (uint8_t i = 0; i < sizeof(commands); i++) { for (uint8_t i = 0; i < MAX_CALLBACKS; i++) {
//os_printf("CMD_GetCbByName: index %d name=%s cb=%p\n", i, callbacks[i].name, //os_printf("CMD_GetCbByName: index %d name=%s cb=%p\n", i, callbacks[i].name,
// (void *)callbacks[i].callback); // (void *)callbacks[i].callback);
// if callback doesn't exist or it's null // if callback doesn't exist or it's null

@ -1,3 +1,5 @@
#include <esp8266.h>
#include "mqtt.h"
#include "mqtt_cmd.h" #include "mqtt_cmd.h"
uint32_t connectedCb = 0, disconnectCb = 0, tcpDisconnectedCb = 0, publishedCb = 0, dataCb = 0; uint32_t connectedCb = 0, disconnectCb = 0, tcpDisconnectedCb = 0, publishedCb = 0, dataCb = 0;
@ -65,10 +67,12 @@ MQTTCMD_Setup(CmdPacket *cmd) {
// create mqtt client // create mqtt client
uint8_t clientLen = sizeof(MQTT_Client); uint8_t clientLen = sizeof(MQTT_Client);
MQTT_Client* client = (MQTT_Client*)os_zalloc(clientLen); MQTT_Client* client = (MQTT_Client*)os_zalloc(clientLen);
if (client == NULL) if (client == NULL) return 0;
return 0;
os_memset(client, 0, clientLen); os_memset(client, 0, clientLen);
return 0;
#if 0
uint16_t len; uint16_t len;
uint8_t *client_id, *user_data, *pass_data; uint8_t *client_id, *user_data, *pass_data;
uint32_t keepalive, clean_session, cb_data; uint32_t keepalive, clean_session, cb_data;
@ -136,6 +140,7 @@ MQTTCMD_Setup(CmdPacket *cmd) {
os_free(pass_data); os_free(pass_data);
return (uint32_t)client; return (uint32_t)client;
#endif
} }
uint32_t ICACHE_FLASH_ATTR uint32_t ICACHE_FLASH_ATTR
@ -217,7 +222,7 @@ MQTTCMD_Connect(CmdPacket *cmd) {
// get security // get security
CMD_PopArg(&req, (uint8_t*)&client->security, 4); CMD_PopArg(&req, (uint8_t*)&client->security, 4);
os_printf("MQTT: MQTTCMD_Connect host=%s, port=%ld, security=%d\n", os_printf("MQTT: MQTTCMD_Connect host=%s, port=%d, security=%d\n",
client->host, client->host,
client->port, client->port,
client->security); client->security);

@ -2,7 +2,6 @@
#define MODULES_MQTT_CMD_H_ #define MODULES_MQTT_CMD_H_
#include "cmd.h" #include "cmd.h"
#include "mqtt.h"
typedef struct { typedef struct {
uint32_t connectedCb; uint32_t connectedCb;

@ -154,12 +154,12 @@ rest_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) {
if(client->ip.addr == 0 && ipaddr->addr != 0) { if(client->ip.addr == 0 && ipaddr->addr != 0) {
os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
//if(client->security){ #ifdef CLIENT_SSL_ENABLE
// espconn_secure_connect(client->pCon); if(client->security) {
//} espconn_secure_connect(client->pCon);
//else { } else
#endif
espconn_connect(client->pCon); espconn_connect(client->pCon);
//}
os_printf("REST: connecting...\n"); os_printf("REST: connecting...\n");
} }
} }
@ -368,6 +368,8 @@ REST_Request(CmdPacket *cmd) {
} }
os_printf("\n"); os_printf("\n");
//os_printf("REST request: %s", (char*)client->data);
os_printf("REST: pCon state=%d\n", client->pCon->state); os_printf("REST: pCon state=%d\n", client->pCon->state);
client->pCon->state = ESPCONN_NONE; client->pCon->state = ESPCONN_NONE;
espconn_regist_connectcb(client->pCon, tcpclient_connect_cb); espconn_regist_connectcb(client->pCon, tcpclient_connect_cb);

@ -24,9 +24,9 @@ static const int num_map_func = sizeof(map_func)/sizeof(char*);
// Cgi to return choice of pin assignments // Cgi to return choice of pin assignments
int ICACHE_FLASH_ATTR cgiPinsGet(HttpdConnData *connData) { int ICACHE_FLASH_ATTR cgiPinsGet(HttpdConnData *connData) {
if (connData->conn==NULL) return HTTPD_CGI_DONE; // Connection aborted if (connData->conn==NULL) return HTTPD_CGI_DONE; // Connection aborted
char buff[2048]; char buff[2048];
int len; int len;
// figure out current mapping // figure out current mapping
@ -62,27 +62,27 @@ int ICACHE_FLASH_ATTR cgiPinsGet(HttpdConnData *connData) {
} }
len += os_sprintf(buff+len, "\n] }"); len += os_sprintf(buff+len, "\n] }");
jsonHeader(connData, 200); jsonHeader(connData, 200);
httpdSend(connData, buff, len); httpdSend(connData, buff, len);
return HTTPD_CGI_DONE; return HTTPD_CGI_DONE;
} }
// Cgi to change choice of pin assignments // Cgi to change choice of pin assignments
int ICACHE_FLASH_ATTR cgiPinsSet(HttpdConnData *connData) { int ICACHE_FLASH_ATTR cgiPinsSet(HttpdConnData *connData) {
if (connData->conn==NULL) { if (connData->conn==NULL) {
return HTTPD_CGI_DONE; // Connection aborted return HTTPD_CGI_DONE; // Connection aborted
} }
char buff[128]; char buff[128];
int len = httpdFindArg(connData->getArgs, "map", buff, sizeof(buff)); int len = httpdFindArg(connData->getArgs, "map", buff, sizeof(buff));
if (len <= 0) { if (len <= 0) {
jsonHeader(connData, 400); jsonHeader(connData, 400);
return HTTPD_CGI_DONE; return HTTPD_CGI_DONE;
} }
int m = atoi(buff); int m = atoi(buff);
if (m < 0 || m >= num_map_names) { if (m < 0 || m >= num_map_names) {
jsonHeader(connData, 400); jsonHeader(connData, 400);
return HTTPD_CGI_DONE; return HTTPD_CGI_DONE;
} }
@ -106,17 +106,17 @@ int ICACHE_FLASH_ATTR cgiPinsSet(HttpdConnData *connData) {
httpdEndHeaders(connData); httpdEndHeaders(connData);
httpdSend(connData, "Failed to save config", -1); httpdSend(connData, "Failed to save config", -1);
} }
return HTTPD_CGI_DONE; return HTTPD_CGI_DONE;
} }
int ICACHE_FLASH_ATTR cgiPins(HttpdConnData *connData) { int ICACHE_FLASH_ATTR cgiPins(HttpdConnData *connData) {
if (connData->conn==NULL) return HTTPD_CGI_DONE; // Connection aborted. Clean up. if (connData->conn==NULL) return HTTPD_CGI_DONE; // Connection aborted. Clean up.
if (connData->requestType == HTTPD_METHOD_GET) { if (connData->requestType == HTTPD_METHOD_GET) {
return cgiPinsGet(connData); return cgiPinsGet(connData);
} else if (connData->requestType == HTTPD_METHOD_POST) { } else if (connData->requestType == HTTPD_METHOD_POST) {
return cgiPinsSet(connData); return cgiPinsSet(connData);
} else { } else {
jsonHeader(connData, 404); jsonHeader(connData, 404);
return HTTPD_CGI_DONE; return HTTPD_CGI_DONE;
} }
} }

@ -29,7 +29,7 @@
#include "log.h" #include "log.h"
#include <gpio.h> #include <gpio.h>
//#define SHOW_HEAP_USE #define SHOW_HEAP_USE
//Function that tells the authentication system what users/passwords live on the system. //Function that tells the authentication system what users/passwords live on the system.
//This is disabled in the default build; if you want to try it, enable the authBasic line in //This is disabled in the default build; if you want to try it, enable the authBasic line in
@ -120,13 +120,16 @@ static char *rst_codes[] = {
# define VERS_STR(V) VERS_STR_STR(V) # define VERS_STR(V) VERS_STR_STR(V)
char* esp_link_version = VERS_STR(VERSION); char* esp_link_version = VERS_STR(VERSION);
//Main routine. Initialize stdout, the I/O, filesystem and the webserver and we're done. extern void app_init(void);
// Main routine to initialize esp-link.
void user_init(void) { void user_init(void) {
// get the flash config so we know how to init things // get the flash config so we know how to init things
//configWipe(); // uncomment to reset the config for testing purposes //configWipe(); // uncomment to reset the config for testing purposes
bool restoreOk = configRestore(); bool restoreOk = configRestore();
// init gpio pin registers // init gpio pin registers
gpio_init(); gpio_init();
gpio_output_set(0, 0, 0, (1<<15)); // some people tie it GND, gotta ensure it's disabled
// init UART // init UART
uart_init(flashConfig.baud_rate, 115200); uart_init(flashConfig.baud_rate, 115200);
logInit(); // must come after init of uart logInit(); // must come after init of uart
@ -163,6 +166,5 @@ void user_init(void) {
os_printf("** esp-link ready\n"); os_printf("** esp-link ready\n");
// call user_main init app_init();
init();
} }

@ -14,56 +14,56 @@ static ETSTimer ledTimer;
static void ICACHE_FLASH_ATTR setLed(int on) { static void ICACHE_FLASH_ATTR setLed(int on) {
int8_t pin = flashConfig.conn_led_pin; int8_t pin = flashConfig.conn_led_pin;
if (pin < 0) return; // disabled if (pin < 0) return; // disabled
// LED is active-low // LED is active-low
if (on) { if (on) {
gpio_output_set(0, (1<<pin), (1<<pin), 0); gpio_output_set(0, (1<<pin), (1<<pin), 0);
} else { } else {
gpio_output_set((1<<pin), 0, (1<<pin), 0); gpio_output_set((1<<pin), 0, (1<<pin), 0);
} }
} }
static uint8_t ledState = 0; static uint8_t ledState = 0;
// Timer callback to update the LED // Timer callback to update the LED
static void ICACHE_FLASH_ATTR ledTimerCb(void *v) { static void ICACHE_FLASH_ATTR ledTimerCb(void *v) {
int time = 1000; int time = 1000;
if (wifiState == wifiGotIP) { if (wifiState == wifiGotIP) {
// connected, all is good, solid light with a short dark blip every 3 seconds // connected, all is good, solid light with a short dark blip every 3 seconds
ledState = 1-ledState; ledState = 1-ledState;
time = ledState ? 2900 : 100; time = ledState ? 2900 : 100;
} else if (wifiState == wifiIsConnected) { } else if (wifiState == wifiIsConnected) {
// waiting for DHCP, go on/off every second // waiting for DHCP, go on/off every second
ledState = 1 - ledState; ledState = 1 - ledState;
time = 1000; time = 1000;
} else { } else {
// not connected // not connected
switch (wifi_get_opmode()) { switch (wifi_get_opmode()) {
case 1: // STA case 1: // STA
ledState = 0; ledState = 0;
break; break;
case 2: // AP case 2: // AP
ledState = 1-ledState; ledState = 1-ledState;
time = ledState ? 50 : 1950; time = ledState ? 50 : 1950;
break; break;
case 3: // STA+AP case 3: // STA+AP
ledState = 1-ledState; ledState = 1-ledState;
time = ledState ? 50 : 950; time = ledState ? 50 : 950;
break; break;
} }
} }
setLed(ledState); setLed(ledState);
os_timer_arm(&ledTimer, time, 0); os_timer_arm(&ledTimer, time, 0);
} }
// change the wifi state indication // change the wifi state indication
void ICACHE_FLASH_ATTR statusWifiUpdate(uint8_t state) { void ICACHE_FLASH_ATTR statusWifiUpdate(uint8_t state) {
wifiState = state; wifiState = state;
// schedule an update (don't want to run into concurrency issues) // schedule an update (don't want to run into concurrency issues)
os_timer_disarm(&ledTimer); os_timer_disarm(&ledTimer);
os_timer_setfn(&ledTimer, ledTimerCb, NULL); os_timer_setfn(&ledTimer, ledTimerCb, NULL);
os_timer_arm(&ledTimer, 500, 0); os_timer_arm(&ledTimer, 500, 0);
} }
//===== RSSI Status update sent to GroveStreams //===== RSSI Status update sent to GroveStreams
@ -76,59 +76,59 @@ static ETSTimer rssiTimer;
// Timer callback to send an RSSI update to a monitoring system // Timer callback to send an RSSI update to a monitoring system
static void ICACHE_FLASH_ATTR rssiTimerCb(void *v) { static void ICACHE_FLASH_ATTR rssiTimerCb(void *v) {
if (!flashConfig.rssi_enable || !flashConfig.tcp_enable || flashConfig.api_key[0]==0) if (!flashConfig.rssi_enable || !flashConfig.tcp_enable || flashConfig.api_key[0]==0)
return; return;
sint8 rssi = wifi_station_get_rssi(); sint8 rssi = wifi_station_get_rssi();
os_printf("timer rssi=%d\n", rssi); os_printf("timer rssi=%d\n", rssi);
if (rssi >= 0) return; // not connected or other error if (rssi >= 0) return; // not connected or other error
// compose TCP command // compose TCP command
uint8_t chan = MAX_TCP_CHAN-1; uint8_t chan = MAX_TCP_CHAN-1;
tcpClientCommand(chan, 'T', "grovestreams.com:80"); tcpClientCommand(chan, 'T', "grovestreams.com:80");
// compose http header // compose http header
char buf[1024]; char buf[1024];
int hdrLen = os_sprintf(buf, int hdrLen = os_sprintf(buf,
"PUT /api/feed?api_key=%s HTTP/1.0\r\n" "PUT /api/feed?api_key=%s HTTP/1.0\r\n"
"Content-Type: application/json\r\n" "Content-Type: application/json\r\n"
"Content-Length: XXXXX\r\n\r\n", "Content-Length: XXXXX\r\n\r\n",
flashConfig.api_key); flashConfig.api_key);
// http body // http body
int dataLen = os_sprintf(buf+hdrLen, int dataLen = os_sprintf(buf+hdrLen,
"[{\"compId\":\"%s\", \"streamId\":\"%s\", \"data\":%d}]\r", "[{\"compId\":\"%s\", \"streamId\":\"%s\", \"data\":%d}]\r",
flashConfig.hostname, GS_STREAM, rssi); flashConfig.hostname, GS_STREAM, rssi);
buf[hdrLen+dataLen++] = 0; buf[hdrLen+dataLen++] = 0;
buf[hdrLen+dataLen++] = '\n'; buf[hdrLen+dataLen++] = '\n';
// hackish way to fill in the content-length // hackish way to fill in the content-length
os_sprintf(buf+hdrLen-9, "%5d", dataLen); os_sprintf(buf+hdrLen-9, "%5d", dataLen);
buf[hdrLen-4] = '\r'; // fix-up the \0 inserted by sprintf (hack!) buf[hdrLen-4] = '\r'; // fix-up the \0 inserted by sprintf (hack!)
// send the request off and forget about it... // send the request off and forget about it...
for (short i=0; i<hdrLen+dataLen; i++) { for (short i=0; i<hdrLen+dataLen; i++) {
tcpClientSendChar(chan, buf[i]); tcpClientSendChar(chan, buf[i]);
} }
tcpClientSendPush(chan); tcpClientSendPush(chan);
} }
//===== Init status stuff //===== Init status stuff
void ICACHE_FLASH_ATTR statusInit(void) { void ICACHE_FLASH_ATTR statusInit(void) {
if (flashConfig.conn_led_pin >= 0) { if (flashConfig.conn_led_pin >= 0) {
makeGpio(flashConfig.conn_led_pin); makeGpio(flashConfig.conn_led_pin);
setLed(1); setLed(1);
} }
os_printf("CONN led=%d\n", flashConfig.conn_led_pin); os_printf("CONN led=%d\n", flashConfig.conn_led_pin);
os_timer_disarm(&ledTimer); os_timer_disarm(&ledTimer);
os_timer_setfn(&ledTimer, ledTimerCb, NULL); os_timer_setfn(&ledTimer, ledTimerCb, NULL);
os_timer_arm(&ledTimer, 2000, 0); os_timer_arm(&ledTimer, 2000, 0);
os_timer_disarm(&rssiTimer); os_timer_disarm(&rssiTimer);
os_timer_setfn(&rssiTimer, rssiTimerCb, NULL); os_timer_setfn(&rssiTimer, rssiTimerCb, NULL);
os_timer_arm(&rssiTimer, RSSI_INTERVAL, 1); // recurring timer os_timer_arm(&rssiTimer, RSSI_INTERVAL, 1); // recurring timer
} }

@ -20,7 +20,7 @@
<script type="text/javascript">console_url = "/console/text"</script> <script type="text/javascript">console_url = "/console/text"</script>
<script src="console.js"></script> <script src="console.js"></script>
<script type="text/javascript"> <script type="text/javascript">
var rates = [57600, 115200, 230400, 460800]; var rates = [9600, 57600, 115200, 250000];
onLoad(function() { onLoad(function() {
fetchText(100, true); fetchText(100, true);

@ -521,7 +521,7 @@ static void ICACHE_FLASH_ATTR httpdDisconCb(void *arg) {
} }
// Callback indicating a failure in the connection. "Recon" is probably intended in the sense // Callback indicating a failure in the connection. "Recon" is probably intended in the sense
// of "you need to reconnect". Sigh... Note that there is no DiconCb after ReconCb // of "you need to reconnect". Sigh... Note that there is no DisconCb after ReconCb
static void ICACHE_FLASH_ATTR httpdReconCb(void *arg, sint8 err) { static void ICACHE_FLASH_ATTR httpdReconCb(void *arg, sint8 err) {
debugConn(arg, "httpdReconCb"); debugConn(arg, "httpdReconCb");
HttpdConnData *conn = httpdFindConnData(arg); HttpdConnData *conn = httpdFindConnData(arg);

@ -1,20 +1,4 @@
#ifndef _USER_CONFIG_H_ #ifndef _USER_CONFIG_H_
#define _USER_CONFIG_H_ #define _USER_CONFIG_H_
#define MQTT_RECONNECT_TIMEOUT 5 // seconds
#define MQTT_BUF_SIZE 1024
#define MQTT_HOST "10.0.0.220" // "mqtt.yourdomain.com" or ip "10.0.0.1"
#define MQTT_PORT 1883
#define MQTT_SECURITY 0
#define MQTT_CLIENT_ID "esp-link" // ""
#define MQTT_USER ""
#define MQTT_PASS ""
#define MQTT_KEEPALIVE 120 // seconds
#define MQTT_CLSESSION true
#define PROTOCOL_NAMEv31 // MQTT version 3.1 compatible with Mosquitto v0.15
//PROTOCOL_NAMEv311 // MQTT version 3.11 compatible with https://eclipse.org/paho/clients/testing/
#endif #endif

File diff suppressed because it is too large Load Diff

@ -27,118 +27,101 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
#ifndef USER_AT_MQTT_H_ #ifndef MQTT_H_
#define USER_AT_MQTT_H_ #define MQTT_H_
#include <esp8266.h>
#include "mqtt_msg.h" #include "mqtt_msg.h"
#include "queue.h" #include "pktbuf.h"
#include <rest.h>
typedef struct mqtt_event_data_t { // in rest.c
uint8_t type; uint8_t UTILS_StrToIP(const char* str, void *ip);
const char* topic;
const char* data;
uint16_t topic_length;
uint16_t data_length;
uint16_t data_offset;
} mqtt_event_data_t;
typedef struct mqtt_state_t {
uint16_t port;
int auto_reconnect;
mqtt_connect_info_t* connect_info;
uint8_t* in_buffer;
uint8_t* out_buffer;
int in_buffer_length;
int out_buffer_length;
uint16_t message_length;
uint16_t message_length_read;
mqtt_message_t* outbound_message;
mqtt_connection_t mqtt_connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
} mqtt_state_t;
// State of MQTT connection
typedef enum { typedef enum {
WIFI_INIT, MQTT_DISCONNECTED, // we're in disconnected state
WIFI_CONNECTING, TCP_RECONNECT_REQ, // connect failed, needs reconnecting
WIFI_CONNECTING_ERROR, TCP_CONNECTING, // in TCP connection process
WIFI_CONNECTED, MQTT_CONNECTED, // conneted (or connecting)
DNS_RESOLVE,
TCP_DISCONNECTED,
TCP_RECONNECT_REQ,
TCP_RECONNECT,
TCP_CONNECTING,
TCP_CONNECTING_ERROR,
TCP_CONNECTED,
MQTT_CONNECT_SEND,
MQTT_CONNECT_SENDING,
MQTT_SUBSCIBE_SEND,
MQTT_SUBSCIBE_SENDING,
MQTT_DATA,
MQTT_PUBLISH_RECV,
MQTT_PUBLISHING
} tConnState; } tConnState;
// Simple notification callback
typedef void (*MqttCallback)(uint32_t* args); typedef void (*MqttCallback)(uint32_t* args);
typedef void (*MqttDataCallback)(uint32_t* args, const char* topic, uint32_t topic_len, const char* data, uint32_t lengh); // Callback with data messge
typedef void (*MqttDataCallback)(uint32_t* args, const char* topic, uint32_t topic_len,
const char* data, uint32_t data_len);
// MQTTY client data structure
typedef struct { typedef struct {
struct espconn* pCon; struct espconn* pCon; // socket
uint8_t security; // connection information
char* host; char* host; // MQTT server
uint32_t port; uint16_t port;
ip_addr_t ip; uint8_t security; // 0=tcp, 1=ssl
mqtt_state_t mqtt_state; ip_addr_t ip; // MQTT server IP address
mqtt_connect_info_t connect_info; mqtt_connect_info_t connect_info; // info to connect/reconnect
MqttCallback connectedCb; // protocol state and message assembly
MqttCallback cmdConnectedCb; tConnState connState; // connection state
MqttCallback disconnectedCb; bool sending; // espconn_send is pending
MqttCallback cmdDisconnectedCb; mqtt_connection_t mqtt_connection; // message assembly descriptor
MqttCallback tcpDisconnectedCb; PktBuf* msgQueue; // queued outbound messages
MqttCallback cmdTcpDisconnectedCb; // TCP input buffer
MqttCallback publishedCb; uint8_t* in_buffer;
MqttCallback cmdPublishedCb; int in_buffer_size; // length allocated
MqttDataCallback dataCb; int in_buffer_filled; // number of bytes held
MqttDataCallback cmdDataCb; // outstanding message when we expect an ACK
ETSTimer mqttTimer; PktBuf* pending_buffer; // buffer sent and awaiting ACK
uint32_t keepAliveTick; PktBuf* sending_buffer; // buffer sent not awaiting ACK
uint32_t reconnectTick; // timer and associated timeout counters
uint32_t sendTimeout; ETSTimer mqttTimer; // timer for this connection
tConnState connState; uint8_t keepAliveTick; // seconds 'til keep-alive is required (0=no k-a)
QUEUE msgQueue; uint8_t keepAliveAckTick; // seconds 'til keep-alive ack is overdue (0=no k-a)
void* user_data; uint8_t timeoutTick; // seconds 'til other timeout
uint8_t sendTimeout; // value of send timeout setting
// callbacks
MqttCallback connectedCb;
MqttCallback cmdConnectedCb;
MqttCallback disconnectedCb;
MqttCallback cmdDisconnectedCb;
MqttCallback tcpDisconnectedCb;
MqttCallback cmdTcpDisconnectedCb;
MqttCallback publishedCb;
MqttCallback cmdPublishedCb;
MqttDataCallback dataCb;
MqttDataCallback cmdDataCb;
// misc
void* user_data;
} MQTT_Client; } MQTT_Client;
#define SEC_NONSSL 0 // Initialize client data structure
#define SEC_SSL 1 void MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port,
uint8_t security, uint8_t sendTimeout,
char* client_id, char* client_user, char* client_pass,
uint8_t keepAliveTime, uint8_t cleanSession);
// Set Last Will Topic on client, must be called before MQTT_InitConnection
void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg,
uint8_t will_qos, uint8_t will_retain);
// Kick of a persistent connection to the broker, will reconnect anytime conn breaks
void MQTT_Connect(MQTT_Client* mqttClient);
// Kill persistent connection
void MQTT_Disconnect(MQTT_Client* mqttClient);
#define MQTT_FLAG_CONNECTED 1 // Subscribe to a topic
#define MQTT_FLAG_READY 2 bool MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos);
#define MQTT_FLAG_EXIT 4
#define MQTT_EVENT_TYPE_NONE 0 // Publish a message
#define MQTT_EVENT_TYPE_CONNECTED 1 bool MQTT_Publish(MQTT_Client* client, const char* topic, const char* data,
#define MQTT_EVENT_TYPE_DISCONNECTED 2 uint8_t qos, uint8_t retain);
#define MQTT_EVENT_TYPE_SUBSCRIBED 3
#define MQTT_EVENT_TYPE_UNSUBSCRIBED 4
#define MQTT_EVENT_TYPE_PUBLISH 5
#define MQTT_EVENT_TYPE_PUBLISHED 6
#define MQTT_EVENT_TYPE_EXITED 7
#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION 8
void MQTT_InitConnection(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security); // Callback when connected
void MQTT_InitClient(MQTT_Client* mqttClient, char* client_id, char* client_user, char* client_pass, uint8_t keepAliveTime, uint8_t cleanSession);
void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, uint8_t will_qos, uint8_t will_retain);
void MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb); void MQTT_OnConnected(MQTT_Client* mqttClient, MqttCallback connectedCb);
// Callback when disconnected
void MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb); void MQTT_OnDisconnected(MQTT_Client* mqttClient, MqttCallback disconnectedCb);
// Callback when publish succeeded
void MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb); void MQTT_OnPublished(MQTT_Client* mqttClient, MqttCallback publishedCb);
// Callback when data arrives for subscription
void MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb); void MQTT_OnData(MQTT_Client* mqttClient, MqttDataCallback dataCb);
bool MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos);
void MQTT_Connect(MQTT_Client* mqttClient);
void MQTT_Disconnect(MQTT_Client* mqttClient);
bool MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t qos, uint8_t retain);
#endif /* USER_AT_MQTT_H_ */ #endif /* USER_AT_MQTT_H_ */

@ -29,6 +29,7 @@
* *
*/ */
#include <esp8266.h>
#include "mqtt_msg.h" #include "mqtt_msg.h"
#define MQTT_MAX_FIXED_HEADER_SIZE 3 #define MQTT_MAX_FIXED_HEADER_SIZE 3
@ -129,7 +130,7 @@ mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_le
} }
int ICACHE_FLASH_ATTR int ICACHE_FLASH_ATTR
mqtt_get_total_length(uint8_t* buffer, uint16_t length) { mqtt_get_total_length(const uint8_t* buffer, uint16_t length) {
int i; int i;
int totlen = 0; int totlen = 0;
@ -146,7 +147,7 @@ mqtt_get_total_length(uint8_t* buffer, uint16_t length) {
} }
const char* ICACHE_FLASH_ATTR const char* ICACHE_FLASH_ATTR
mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) { mqtt_get_publish_topic(const uint8_t* buffer, uint16_t* length) {
int i; int i;
int totlen = 0; int totlen = 0;
int topiclen; int topiclen;
@ -173,7 +174,7 @@ mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) {
} }
const char* ICACHE_FLASH_ATTR const char* ICACHE_FLASH_ATTR
mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) { mqtt_get_publish_data(const uint8_t* buffer, uint16_t* length) {
int i; int i;
int totlen = 0; int totlen = 0;
int topiclen; int topiclen;
@ -215,7 +216,7 @@ mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) {
} }
uint16_t ICACHE_FLASH_ATTR uint16_t ICACHE_FLASH_ATTR
mqtt_get_id(uint8_t* buffer, uint16_t length) { mqtt_get_id(const uint8_t* buffer, uint16_t length) {
if (length < 1) if (length < 1)
return 0; return 0;

@ -30,8 +30,9 @@
*/ */
#ifndef MQTT_MSG_H #ifndef MQTT_MSG_H
#define MQTT_MSG_H #define MQTT_MSG_H
#include <esp8266.h>
#define PROTOCOL_NAMEv311
enum mqtt_message_type { enum mqtt_message_type {
MQTT_MSG_TYPE_CONNECT = 1, MQTT_MSG_TYPE_CONNECT = 1,
@ -50,21 +51,22 @@ enum mqtt_message_type {
MQTT_MSG_TYPE_DISCONNECT = 14 MQTT_MSG_TYPE_DISCONNECT = 14
}; };
// Descriptor for a serialized MQTT message, this is returned by functions that compose a message
// (It's really an MQTT packet in v3.1.1 terminology)
typedef struct mqtt_message { typedef struct mqtt_message {
uint8_t* data; uint8_t* data;
uint16_t length; uint16_t length;
} mqtt_message_t; } mqtt_message_t;
// Descriptor for a connection with message assembly storage
typedef struct mqtt_connection { typedef struct mqtt_connection {
mqtt_message_t message; mqtt_message_t message; // resulting message
uint16_t message_id; // id of assembled message and memo to calculate next message id
uint16_t message_id; uint8_t* buffer; // buffer for assembling messages
uint8_t* buffer; uint16_t buffer_length; // buffer length
uint16_t buffer_length;
} mqtt_connection_t; } mqtt_connection_t;
// Descriptor for a connect request
typedef struct mqtt_connect_info { typedef struct mqtt_connect_info {
char* client_id; char* client_id;
char* username; char* username;
@ -75,32 +77,40 @@ typedef struct mqtt_connect_info {
uint8_t will_qos; uint8_t will_qos;
uint8_t will_retain; uint8_t will_retain;
uint8_t clean_session; uint8_t clean_session;
} mqtt_connect_info_t; } mqtt_connect_info_t;
static inline int ICACHE_FLASH_ATTR mqtt_get_type(const uint8_t* buffer) {
static inline int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t* buffer) {
return (buffer[0] & 0xf0) >> 4; return (buffer[0] & 0xf0) >> 4;
} }
static inline int ICACHE_FLASH_ATTR mqtt_get_dup(uint8_t* buffer) { static inline int ICACHE_FLASH_ATTR mqtt_get_dup(const uint8_t* buffer) {
return (buffer[0] & 0x08) >> 3; return (buffer[0] & 0x08) >> 3;
} }
static inline int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t* buffer) { static inline int ICACHE_FLASH_ATTR mqtt_get_qos(const uint8_t* buffer) {
return (buffer[0] & 0x06) >> 1; return (buffer[0] & 0x06) >> 1;
} }
static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { static inline int ICACHE_FLASH_ATTR mqtt_get_retain(const uint8_t* buffer) {
return (buffer[0] & 0x01); return (buffer[0] & 0x01);
} }
// Init a connection descriptor
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
int mqtt_get_total_length(uint8_t* buffer, uint16_t length);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length);
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
// Returns the total length of a message including MQTT fixed header
int mqtt_get_total_length(const uint8_t* buffer, uint16_t length);
// Return pointer to topic, length in in/out param: in=length of buffer, out=length of topic
const char* mqtt_get_publish_topic(const uint8_t* buffer, uint16_t* length);
// Return pointer to data, length in in/out param: in=length of buffer, out=length of data
const char* mqtt_get_publish_data(const uint8_t* buffer, uint16_t* length);
// Return message id
uint16_t mqtt_get_id(const uint8_t* buffer, uint16_t length);
// The following functions construct an outgoing message
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);
mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id);

@ -0,0 +1,64 @@
// Copyright 2015 by Thorsten von Eicken, see LICENSE.txt
#include <esp8266.h>
#include "pktbuf.h"
void ICACHE_FLASH_ATTR
PktBuf_Print(PktBuf *buf) {
os_printf("PktBuf:");
for (int i=-16; i<0; i++)
os_printf(" %02X", ((uint8_t*)buf)[i]);
os_printf(" %p", buf);
for (int i=0; i<16; i++)
os_printf(" %02X", ((uint8_t*)buf)[i]);
os_printf("\n");
os_printf("PktBuf: next=%p len=0x%04x\n",
((void**)buf)[-4], ((uint16_t*)buf)[-6]);
}
PktBuf * ICACHE_FLASH_ATTR
PktBuf_New(uint16_t length) {
PktBuf *buf = os_zalloc(length+sizeof(PktBuf));
buf->next = NULL;
buf->filled = 0;
//os_printf("PktBuf_New: %p l=%d->%d d=%p\n",
// buf, length, length+sizeof(PktBuf), buf->data);
return buf;
}
PktBuf * ICACHE_FLASH_ATTR
PktBuf_Push(PktBuf *headBuf, PktBuf *buf) {
if (headBuf == NULL) {
//os_printf("PktBuf_Push: %p\n", buf);
return buf;
}
PktBuf *h = headBuf;
while (h->next != NULL) h = h->next;
h->next = buf;
//os_printf("PktBuf_Push: %p->..->%p\n", headBuf, buf);
return headBuf;
}
PktBuf * ICACHE_FLASH_ATTR
PktBuf_Unshift(PktBuf *headBuf, PktBuf *buf) {
buf->next = headBuf;
//os_printf("PktBuf_Unshift: %p->%p\n", buf, buf->next);
return buf;
}
PktBuf * ICACHE_FLASH_ATTR
PktBuf_Shift(PktBuf *headBuf) {
PktBuf *buf = headBuf->next;
headBuf->next = NULL;
//os_printf("PktBuf_Shift: (%p)->%p\n", headBuf, buf);
return buf;
}
PktBuf * ICACHE_FLASH_ATTR
PktBuf_ShiftFree(PktBuf *headBuf) {
PktBuf *buf = headBuf->next;
//os_printf("PktBuf_ShiftFree: (%p)->%p\n", headBuf, buf);
os_free(headBuf);
return buf;
}

@ -0,0 +1,29 @@
// Copyright 2015 by Thorsten von Eicken, see LICENSE.txt
#ifndef PKTBUF_H
#define PKTBUF_H
typedef struct PktBuf {
struct PktBuf *next; // next buffer in chain
uint16_t filled; // number of bytes filled in buffer
uint8_t data[0]; // data in buffer
} PktBuf;
// Allocate a new packet buffer of given length
PktBuf *PktBuf_New(uint16_t length);
// Append a buffer to the end of a packet buffer queue, returns new head
PktBuf *PktBuf_Push(PktBuf *headBuf, PktBuf *buf);
// Prepend a buffer to the beginning of a packet buffer queue, return new head
PktBuf * PktBuf_Unshift(PktBuf *headBuf, PktBuf *buf);
// Shift first buffer off queue, returns new head (not shifted buffer!)
PktBuf *PktBuf_Shift(PktBuf *headBuf);
// Shift first buffer off queue, free it, return new head
PktBuf *PktBuf_ShiftFree(PktBuf *headBuf);
void PktBuf_Print(PktBuf *buf);
#endif

@ -1,86 +0,0 @@
#include "proto.h"
int8_t ICACHE_FLASH_ATTR
PROTO_Init(PROTO_PARSER* parser, PROTO_PARSE_CALLBACK* completeCallback, uint8_t* buf, uint16_t bufSize) {
parser->buf = buf;
parser->bufSize = bufSize;
parser->dataLen = 0;
parser->callback = completeCallback;
parser->isEsc = 0;
return 0;
}
int8_t ICACHE_FLASH_ATTR
PROTO_ParseByte(PROTO_PARSER* parser, uint8_t value) {
switch (value) {
case 0x7D:
parser->isEsc = 1;
break;
case 0x7E:
parser->dataLen = 0;
parser->isEsc = 0;
parser->isBegin = 1;
break;
case 0x7F:
if (parser->callback != NULL)
parser->callback();
parser->isBegin = 0;
return 0;
break;
default:
if (parser->isBegin == 0) break;
if (parser->isEsc) {
value ^= 0x20;
parser->isEsc = 0;
}
if (parser->dataLen < parser->bufSize)
parser->buf[parser->dataLen++] = value;
break;
}
return -1;
}
int16_t ICACHE_FLASH_ATTR
PROTO_ParseRb(RINGBUF* rb, uint8_t* bufOut, uint16_t* len, uint16_t maxBufLen) {
uint8_t c;
PROTO_PARSER proto;
PROTO_Init(&proto, NULL, bufOut, maxBufLen);
while (RINGBUF_Get(rb, &c) == 0) {
if (PROTO_ParseByte(&proto, c) == 0) {
*len = proto.dataLen;
return 0;
}
}
return -1;
}
int16_t ICACHE_FLASH_ATTR
PROTO_AddRb(RINGBUF* rb, const uint8_t* packet, int16_t len) {
uint16_t i = 2;
if (RINGBUF_Put(rb, 0x7E) == -1) return -1;
while (len--) {
switch (*packet) {
case 0x7D:
case 0x7E:
case 0x7F:
if (RINGBUF_Put(rb, 0x7D) == -1) return -1;
if (RINGBUF_Put(rb, *packet++ ^ 0x20) == -1) return -1;
i += 2;
break;
default:
if (RINGBUF_Put(rb, *packet++) == -1) return -1;
i++;
break;
}
}
if (RINGBUF_Put(rb, 0x7F) == -1) return -1;
return i;
}

@ -1,21 +0,0 @@
#ifndef _PROTO_H_
#define _PROTO_H_
#include <esp8266.h>
#include "ringbuf.h"
typedef void (PROTO_PARSE_CALLBACK)();
typedef struct {
uint8_t* buf;
uint16_t bufSize;
uint16_t dataLen;
uint8_t isEsc;
uint8_t isBegin;
PROTO_PARSE_CALLBACK* callback;
} PROTO_PARSER;
int8_t PROTO_Init(PROTO_PARSER* parser, PROTO_PARSE_CALLBACK* completeCallback, uint8_t* buf, uint16_t bufSize);
int16_t PROTO_AddRb(RINGBUF* rb, const uint8_t* packet, int16_t len);
int8_t PROTO_ParseByte(PROTO_PARSER* parser, uint8_t value);
int16_t PROTO_ParseRb(RINGBUF* rb, uint8_t* bufOut, uint16_t* len, uint16_t maxBufLen);
#endif

@ -1,53 +0,0 @@
/* str_queue.c
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "queue.h"
void ICACHE_FLASH_ATTR
QUEUE_Init(QUEUE* queue, int bufferSize) {
queue->buf = (uint8_t*)os_zalloc(bufferSize);
RINGBUF_Init(&queue->rb, queue->buf, bufferSize);
}
int32_t ICACHE_FLASH_ATTR
QUEUE_Puts(QUEUE* queue, uint8_t* buffer, uint16_t len) {
return PROTO_AddRb(&queue->rb, buffer, len);
}
int32_t ICACHE_FLASH_ATTR
QUEUE_Gets(QUEUE* queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen) {
return PROTO_ParseRb(&queue->rb, buffer, len, maxLen);
}
bool ICACHE_FLASH_ATTR
QUEUE_IsEmpty(QUEUE* queue) {
if (queue->rb.fill_cnt <= 0)
return TRUE;
return FALSE;
}

@ -1,46 +0,0 @@
/* str_queue.h --
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef USER_QUEUE_H_
#define USER_QUEUE_H_
#include <esp8266.h>
#include "proto.h"
#include "ringbuf.h"
typedef struct {
uint8_t* buf;
RINGBUF rb;
} QUEUE;
void QUEUE_Init(QUEUE* queue, int bufferSize);
int32_t QUEUE_Puts(QUEUE* queue, uint8_t* buffer, uint16_t len);
int32_t QUEUE_Gets(QUEUE* queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen);
bool QUEUE_IsEmpty(QUEUE* queue);
#endif /* USER_QUEUE_H_ */

@ -1,63 +0,0 @@
#include "ringbuf.h"
/**
* \brief init a RINGBUF object
* \param r pointer to a RINGBUF object
* \param buf pointer to a byte array
* \param size size of buf
* \return 0 if successfull, otherwise failed
*/
int16_t ICACHE_FLASH_ATTR
RINGBUF_Init(RINGBUF* r, uint8_t* buf, int32_t size) {
if (r == NULL || buf == NULL || size < 2) return -1;
r->p_o = r->p_r = r->p_w = buf;
r->fill_cnt = 0;
r->size = size;
return 0;
}
/**
* \brief put a character into ring buffer
* \param r pointer to a ringbuf object
* \param c character to be put
* \return 0 if successfull, otherwise failed
*/
int16_t ICACHE_FLASH_ATTR
RINGBUF_Put(RINGBUF* r, uint8_t c) {
if (r->fill_cnt >= r->size)return -1; // ring buffer is full, this should be atomic operation
r->fill_cnt++; // increase filled slots count, this should be atomic operation
*r->p_w++ = c; // put character into buffer
if (r->p_w >= r->p_o + r->size) // rollback if write pointer go pass
r->p_w = r->p_o; // the physical boundary
return 0;
}
/**
* \brief get a character from ring buffer
* \param r pointer to a ringbuf object
* \param c read character
* \return 0 if successfull, otherwise failed
*/
int16_t ICACHE_FLASH_ATTR
RINGBUF_Get(RINGBUF* r, uint8_t* c) {
if (r->fill_cnt <= 0)return -1; // ring buffer is empty, this should be atomic operation
r->fill_cnt--; // decrease filled slots count
*c = *r->p_r++; // get the character out
if (r->p_r >= r->p_o + r->size) // rollback if write pointer go pass
r->p_r = r->p_o; // the physical boundary
return 0;
}

@ -1,17 +0,0 @@
#ifndef _RING_BUF_H_
#define _RING_BUF_H_
#include <esp8266.h>
typedef struct {
uint8_t* p_o; /**< Original pointer */
uint8_t* volatile p_r; /**< Read pointer */
uint8_t* volatile p_w; /**< Write pointer */
volatile int32_t fill_cnt; /**< Number of filled slots */
int32_t size; /**< Buffer size */
} RINGBUF;
int16_t RINGBUF_Init(RINGBUF* r, uint8_t* buf, int32_t size);
int16_t RINGBUF_Put(RINGBUF* r, uint8_t c);
int16_t RINGBUF_Get(RINGBUF* r, uint8_t* c);
#endif

@ -273,6 +273,7 @@ serbridgeSentCb(void *arg) {
// Error callback (it's really poorly named, it's not a "connection reconnected" callback, // Error callback (it's really poorly named, it's not a "connection reconnected" callback,
// it's really a "connection broken, please reconnect" callback) // it's really a "connection broken, please reconnect" callback)
// Note that there is no DisconCb after a ReconCb
static void ICACHE_FLASH_ATTR serbridgeReconCb(void *arg, sint8 err) { static void ICACHE_FLASH_ATTR serbridgeReconCb(void *arg, sint8 err) {
serbridgeConnData *sbConn = serbridgeFindConnData(arg); serbridgeConnData *sbConn = serbridgeFindConnData(arg);
if (sbConn == NULL) return; if (sbConn == NULL) return;

@ -6,4 +6,3 @@ void serledInit(void);
void makeGpio(uint8_t pin); void makeGpio(uint8_t pin);
#endif #endif

@ -23,7 +23,7 @@
#include "user_interface.h" #include "user_interface.h"
#include "uart.h" #include "uart.h"
#define recvTaskPrio 0 #define recvTaskPrio 1
#define recvTaskQueueLen 64 #define recvTaskQueueLen 64
// UartDev is defined and initialized in rom code. // UartDev is defined and initialized in rom code.

@ -1,9 +1,39 @@
#include <esp8266.h> #include <esp8266.h>
#include <mqtt.h> #include "cgiwifi.h"
#include <cgiwifi.h> #include "mqtt.h"
MQTT_Client mqttClient; MQTT_Client mqttClient;
static ETSTimer mqttTimer;
static int once = 0;
static void ICACHE_FLASH_ATTR mqttTimerCb(void *arg) {
if (once++ > 0) return;
MQTT_Init(&mqttClient, "h.voneicken.com", 1883, 0, 2, "test1", "", "", 10, 1);
MQTT_Connect(&mqttClient);
MQTT_Subscribe(&mqttClient, "system/time", 0);
}
void ICACHE_FLASH_ATTR
wifiStateChangeCb(uint8_t status)
{
if (status == wifiGotIP) {
os_timer_disarm(&mqttTimer);
os_timer_setfn(&mqttTimer, mqttTimerCb, NULL);
os_timer_arm(&mqttTimer, 15000, 0);
}
}
// initialize the custom stuff that goes beyond esp-link
void app_init() {
wifiAddStateChangeCb(wifiStateChangeCb);
}
#if 0
MQTT_Client mqttClient;
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
mqttConnectedCb(uint32_t *args) { mqttConnectedCb(uint32_t *args) {
MQTT_Client* client = (MQTT_Client*)args; MQTT_Client* client = (MQTT_Client*)args;
@ -46,19 +76,6 @@ mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *da
os_free(dataBuf); os_free(dataBuf);
} }
void ICACHE_FLASH_ATTR
wifiStateChangeCb(uint8_t status)
{
if (status == wifiGotIP && mqttClient.connState != TCP_CONNECTING){
MQTT_Connect(&mqttClient);
}
else if (status == wifiIsDisconnected && mqttClient.connState == TCP_CONNECTING){
MQTT_Disconnect(&mqttClient);
}
}
void init() {
wifiAddStateChangeCb(wifiStateChangeCb);
MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, MQTT_SECURITY); MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, MQTT_SECURITY);
MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, MQTT_CLSESSION); MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, MQTT_CLSESSION);
MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0); MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0);
@ -67,4 +84,4 @@ void init() {
MQTT_OnDisconnected(&mqttClient, mqttTcpDisconnectedCb); MQTT_OnDisconnected(&mqttClient, mqttTcpDisconnectedCb);
MQTT_OnPublished(&mqttClient, mqttPublishedCb); MQTT_OnPublished(&mqttClient, mqttPublishedCb);
MQTT_OnData(&mqttClient, mqttDataCb); MQTT_OnData(&mqttClient, mqttDataCb);
} #endif

Loading…
Cancel
Save