mqtt status client hook-up

pull/47/head
Thorsten von Eicken 9 years ago
parent c1ed3695a9
commit aecaa26c65
  1. 14
      Makefile
  2. 315
      cmd/tcpclient.c
  3. 16
      cmd/tcpclient.h
  4. 13
      esp-link/log.c
  5. 2
      esp-link/log.h
  6. 8
      esp-link/main.c
  7. 90
      esp-link/mqtt_client.c
  8. 69
      esp-link/status.c
  9. 8
      mqtt/mqtt.c

@ -53,6 +53,20 @@ ET_FS ?= 8m # 8Mbit flash size in esptool flash command
ET_FF ?= 80m # 80Mhz flash speed in esptool flash command ET_FF ?= 80m # 80Mhz flash speed in esptool flash command
ET_BLANK ?= 0xFE000 # where to flash blank.bin to erase wireless settings ET_BLANK ?= 0xFE000 # where to flash blank.bin to erase wireless settings
else ifeq ("$(FLASH_SIZE)","2MB")
# Manuf 0xA1 Chip 0x4015 found on wroom-02 modules
# Here we're using two partitions of approx 0.5MB because that's what's easily available in terms
# of linker scripts in the SDK. Ideally we'd use two partitions of approx 1MB, the remaining 2MB
# cannot be used for code (esp8266 limitation).
ESP_SPI_SIZE ?= 4 # 6->4MB (1MB+1MB) or 4->4MB (512KB+512KB)
ESP_FLASH_MODE ?= 0 # 0->QIO, 2->DIO
ESP_FLASH_FREQ_DIV ?= 15 # 15->80Mhz
ESP_FLASH_MAX ?= 503808 # max bin file for 512KB flash partition: 492KB
#ESP_FLASH_MAX ?= 1028096 # max bin file for 1MB flash partition: 1004KB
ET_FS ?= 16m # 16Mbit flash size in esptool flash command
ET_FF ?= 80m # 80Mhz flash speed in esptool flash command
ET_BLANK ?= 0x1FE000 # where to flash blank.bin to erase wireless settings
else else
# Winbond 25Q32 4MB flash, typ for esp-12 # Winbond 25Q32 4MB flash, typ for esp-12
# Here we're using two partitions of approx 0.5MB because that's what's easily available in terms # Here we're using two partitions of approx 0.5MB because that's what's easily available in terms

@ -1,315 +0,0 @@
// Copyright 2015 by Thorsten von Eicken, see LICENSE.txt
//
// TCP client library allowing uControllers attached to the serial port to send commands
// to open/close TCP connections and send/recv data.
// The serial protocol is described in https://gist.github.com/tve/a46c44bf1f6b42bc572e
#include <esp8266.h>
#include "config.h"
#include "uart.h"
#include "serled.h"
#include "tcpclient.h"
// max number of channels the client can open
#define MAX_CHAN MAX_TCP_CHAN
// size of tx buffer
#define MAX_TXBUF 1024
enum TcpState {
TCP_idle, // unused connection
TCP_dns, // doing gethostbyname
TCP_conn, // connecting to remote server
TCP_data, // connected
};
// Connections
typedef struct {
struct espconn *conn; // esp connection structure
esp_tcp *tcp; // esp TCP parameters
char *txBuf; // buffer to accumulate into
char *txBufSent; // buffer held by espconn
uint8_t txBufLen; // number of chars in txbuf
enum TcpState state;
} TcpConn;
static TcpConn tcpConn[MAX_CHAN];
// forward declarations
static void tcpConnFree(TcpConn* tci);
static TcpConn* tcpConnAlloc(uint8_t chan);
static void tcpDoSend(TcpConn *tci);
static void tcpConnectCb(void *arg);
static void tcpDisconCb(void *arg);
static void tcpResetCb(void *arg, sint8 err);
static void tcpSentCb(void *arg);
static void tcpRecvCb(void *arg, char *data, uint16_t len);
//===== allocate / free connections
// Allocate a new connection dynamically and return it. Returns NULL if buf alloc failed
static TcpConn* ICACHE_FLASH_ATTR
tcpConnAlloc(uint8_t chan) {
TcpConn *tci = tcpConn+chan;
if (tci->state != TCP_idle && tci->conn != NULL) return tci;
// malloc and return espconn struct
tci->conn = os_malloc(sizeof(struct espconn));
if (tci->conn == NULL) goto fail;
memset(tci->conn, 0, sizeof(struct espconn));
// malloc esp_tcp struct
tci->tcp = os_malloc(sizeof(esp_tcp));
if (tci->tcp == NULL) goto fail;
memset(tci->tcp, 0, sizeof(esp_tcp));
// common init
tci->state = TCP_dns;
tci->conn->type = ESPCONN_TCP;
tci->conn->state = ESPCONN_NONE;
tci->conn->proto.tcp = tci->tcp;
tci->tcp->remote_port = 80;
espconn_regist_connectcb(tci->conn, tcpConnectCb);
espconn_regist_reconcb(tci->conn, tcpResetCb);
espconn_regist_sentcb(tci->conn, tcpSentCb);
espconn_regist_recvcb(tci->conn, tcpRecvCb);
espconn_regist_disconcb(tci->conn, tcpDisconCb);
tci->conn->reverse = tci;
return tci;
fail:
tcpConnFree(tci);
return NULL;
}
// Free a connection dynamically.
static void ICACHE_FLASH_ATTR
tcpConnFree(TcpConn* tci) {
if (tci->conn != NULL) os_free(tci->conn);
if (tci->tcp != NULL) os_free(tci->tcp);
if (tci->txBuf != NULL) os_free(tci->txBuf);
if (tci->txBufSent != NULL) os_free(tci->txBufSent);
memset(tci, 0, sizeof(TcpConn));
}
//===== DNS
// DNS name resolution callback
static void ICACHE_FLASH_ATTR
tcpClientHostnameCb(const char *name, ip_addr_t *ipaddr, void *arg) {
struct espconn *conn = arg;
TcpConn *tci = conn->reverse;
os_printf("TCP dns CB (%p %p)\n", arg, tci);
if (ipaddr == NULL) {
os_printf("TCP %s not found\n", name);
} else {
os_printf("TCP %s -> %d.%d.%d.%d\n", name, IP2STR(ipaddr));
tci->tcp->remote_ip[0] = ip4_addr1(ipaddr);
tci->tcp->remote_ip[1] = ip4_addr2(ipaddr);
tci->tcp->remote_ip[2] = ip4_addr3(ipaddr);
tci->tcp->remote_ip[3] = ip4_addr4(ipaddr);
os_printf("TCP connect %d.%d.%d.%d (%p)\n", IP2STR(tci->tcp->remote_ip), tci);
if (espconn_connect(tci->conn) == ESPCONN_OK) {
tci->state = TCP_conn;
return;
}
os_printf("TCP connect failure\n");
}
// oops
tcpConnFree(tci);
}
//===== Connect / disconnect
// Connected callback
static void ICACHE_FLASH_ATTR
tcpConnectCb(void *arg) {
struct espconn *conn = arg;
TcpConn *tci = conn->reverse;
os_printf("TCP connect CB (%p %p)\n", arg, tci);
tci->state = TCP_data;
// send any buffered data
if (tci->txBuf != NULL && tci->txBufLen > 0) tcpDoSend(tci);
// reply to serial
char buf[6];
short l = os_sprintf(buf, "\n~@%dC\n", tci-tcpConn);
uart0_tx_buffer(buf, l);
}
// Disconnect callback
static void ICACHE_FLASH_ATTR tcpDisconCb(void *arg) {
struct espconn *conn = arg;
TcpConn *tci = conn->reverse;
os_printf("TCP disconnect CB (%p %p)\n", arg, tci);
// notify to serial
char buf[6];
short l = os_sprintf(buf, "\n~@%dZ\n", tci-tcpConn);
uart0_tx_buffer(buf, l);
// free
tcpConnFree(tci);
}
// Connection reset callback
static void ICACHE_FLASH_ATTR tcpResetCb(void *arg, sint8 err) {
struct espconn *conn = arg;
TcpConn *tci = conn->reverse;
os_printf("TCP reset CB (%p %p) err=%d\n", arg, tci, err);
// notify to serial
char buf[6];
short l = os_sprintf(buf, "\n~@%dZ\n", tci-tcpConn);
uart0_tx_buffer(buf, l);
// free
tcpConnFree(tci);
}
//===== Sending and receiving
// Send the next buffer (assumes that the connection is in a state that allows it)
static void ICACHE_FLASH_ATTR
tcpDoSend(TcpConn *tci) {
sint8 err = espconn_sent(tci->conn, (uint8*)tci->txBuf, tci->txBufLen);
if (err == ESPCONN_OK) {
// send successful
os_printf("TCP sent (%p %p)\n", tci->conn, tci);
tci->txBuf[tci->txBufLen] = 0; os_printf("TCP data: %s\n", tci->txBuf);
tci->txBufSent = tci->txBuf;
tci->txBuf = NULL;
tci->txBufLen = 0;
} else {
// send error, leave as-is and try again later...
os_printf("TCP send err (%p %p) %d\n", tci->conn, tci, err);
}
}
// Sent callback
static void ICACHE_FLASH_ATTR
tcpSentCb(void *arg) {
struct espconn *conn = arg;
TcpConn *tci = conn->reverse;
os_printf("TCP sent CB (%p %p)\n", arg, tci);
if (tci->txBufSent != NULL) os_free(tci->txBufSent);
tci->txBufSent = NULL;
if (tci->txBuf != NULL && tci->txBufLen == MAX_TXBUF) {
// next buffer is full, send it now
tcpDoSend(tci);
}
}
// Recv callback
static void ICACHE_FLASH_ATTR tcpRecvCb(void *arg, char *data, uint16_t len) {
struct espconn *conn = arg;
TcpConn *tci = conn->reverse;
os_printf("TCP recv CB (%p %p)\n", arg, tci);
if (tci->state == TCP_data) {
uint8_t chan;
for (chan=0; chan<MAX_CHAN && tcpConn+chan!=tci; chan++)
if (chan >= MAX_CHAN) return; // oops!?
char buf[6];
short l = os_sprintf(buf, "\n~%d", chan);
uart0_tx_buffer(buf, l);
uart0_tx_buffer(data, len);
uart0_tx_buffer("\0\n", 2);
}
serledFlash(50); // short blink on serial LED
}
void ICACHE_FLASH_ATTR
tcpClientSendChar(uint8_t chan, char c) {
TcpConn *tci = tcpConn+chan;
if (tci->state == TCP_idle) return;
if (tci->txBuf != NULL) {
// we have a buffer
if (tci->txBufLen < MAX_TXBUF) {
// buffer has space, add char and return
tci->txBuf[tci->txBufLen++] = c;
return;
} else if (tci->txBufSent == NULL) {
// we don't have a send pending, send full buffer off
if (tci->state == TCP_data) tcpDoSend(tci);
if (tci->txBuf != NULL) return; // something went wrong
} else {
// buffers all backed-up, drop char
return;
}
}
// we do not have a buffer (either didn't have one or sent it off)
// allocate one
tci->txBuf = os_malloc(MAX_TXBUF);
tci->txBufLen = 0;
if (tci->txBuf != NULL) {
tci->txBuf[tci->txBufLen++] = c;
}
}
void ICACHE_FLASH_ATTR
tcpClientSendPush(uint8_t chan) {
TcpConn *tci = tcpConn+chan;
if (tci->state != TCP_data) return; // no active connection on this channel
if (tci->txBuf == NULL || tci->txBufLen == 0) return; // no chars accumulated to send
if (tci->txBufSent != NULL) return; // already got a send in progress
tcpDoSend(tci);
}
//===== Command parsing
// Perform a TCP command: parse the command and do the right thing.
// Returns true on success.
bool ICACHE_FLASH_ATTR
tcpClientCommand(uint8_t chan, char cmd, char *cmdBuf) {
TcpConn *tci;
char *hostname;
char *port;
// copy the command so we can modify it
char buf[128];
os_strncpy(buf, cmdBuf, 128);
buf[127] = 0;
switch (cmd) {
//== TCP Connect command
case 'T':
hostname = buf;
port = hostname;
while (*port != 0 && *port != ':') port++;
if (*port != ':') break;
*port = 0;
port++;
int portInt = atoi(port);
if (portInt < 1 || portInt > 65535) break;
// allocate a connection
tci = tcpConnAlloc(chan);
if (tci == NULL) break;
tci->state = TCP_dns;
tci->tcp->remote_port = portInt;
// start the DNS resolution
os_printf("TCP %p resolving %s for chan %d (conn=%p)\n", tci, hostname, chan ,tci->conn);
ip_addr_t ip;
err_t err = espconn_gethostbyname(tci->conn, hostname, &ip, tcpClientHostnameCb);
if (err == ESPCONN_OK) {
// dns cache hit, got the IP address, fake the callback (sigh)
os_printf("TCP DNS hit\n");
tcpClientHostnameCb(hostname, &ip, tci->conn);
} else if (err != ESPCONN_INPROGRESS) {
tcpConnFree(tci);
break;
}
return true;
//== TCP Close/disconnect command
case 'C':
os_printf("TCP closing chan %d\n", chan);
tci = tcpConn+chan;
if (tci->state > TCP_idle) {
tci->state = TCP_idle; // hackish...
espconn_disconnect(tci->conn);
}
break;
}
return false;
}

@ -1,16 +0,0 @@
#ifndef __TCP_CLIENT_H__
#define __TCP_CLIENT_H__
// max number of channels the client can open
#define MAX_TCP_CHAN 8
// Parse and perform the command, cmdBuf must be null-terminated
bool tcpClientCommand(uint8_t chan, char cmd, char *cmdBuf);
// Append a character to the specified channel
void tcpClientSendChar(uint8_t chan, char c);
// Enqueue the buffered characters for transmission on the specified channel
void tcpClientSendPush(uint8_t chan);
#endif /* __TCP_CLIENT_H__ */

@ -153,6 +153,19 @@ ajaxLogDbg(HttpdConnData *connData) {
return HTTPD_CGI_DONE; return HTTPD_CGI_DONE;
} }
void ICACHE_FLASH_ATTR dumpMem(void *addr, int len) {
uint8_t *a = addr;
int off = 0;
while (off < len) {
os_printf("%p ", a);
for (int i=0; i<16 && off+i<len; i++)
os_printf(" %02x", a[i]);
os_printf(" ");
for (int i=0; i<16 && off<len; i++,off++,a++)
os_printf("%c", *a > 0x20 && *a < 0x3f ? *a : '.');
os_printf("\n");
}
}
void ICACHE_FLASH_ATTR logInit() { void ICACHE_FLASH_ATTR logInit() {
log_no_uart = flashConfig.log_mode == LOG_MODE_OFF; // ON unless set to always-off log_no_uart = flashConfig.log_mode == LOG_MODE_OFF; // ON unless set to always-off

@ -12,4 +12,6 @@ void log_uart(bool enable);
int ajaxLog(HttpdConnData *connData); int ajaxLog(HttpdConnData *connData);
int ajaxLogDbg(HttpdConnData *connData); int ajaxLogDbg(HttpdConnData *connData);
void dumpMem(void *addr, int len);
#endif #endif

@ -117,6 +117,10 @@ extern uint32_t _binary_espfs_img_start;
static char *rst_codes[] = { static char *rst_codes[] = {
"normal", "wdt reset", "exception", "soft wdt", "restart", "deep sleep", "external", "normal", "wdt reset", "exception", "soft wdt", "restart", "deep sleep", "external",
}; };
static char *flash_maps[] = {
"512KB:256/256", "256KB", "1MB:512/512", "2MB:512/512", "4MB:512/512",
"2MB:1024/1024", "4MB:1024/1024"
};
# define VERS_STR_STR(V) #V # define VERS_STR_STR(V) #V
# define VERS_STR(V) VERS_STR_STR(V) # define VERS_STR(V) VERS_STR_STR(V)
@ -167,7 +171,9 @@ void user_init(void) {
os_printf("exccause=%d epc1=0x%x epc2=0x%x epc3=0x%x excvaddr=0x%x depc=0x%x\n", os_printf("exccause=%d epc1=0x%x epc2=0x%x epc3=0x%x excvaddr=0x%x depc=0x%x\n",
rst_info->exccause, rst_info->epc1, rst_info->epc2, rst_info->epc3, rst_info->exccause, rst_info->epc1, rst_info->epc2, rst_info->epc3,
rst_info->excvaddr, rst_info->depc); rst_info->excvaddr, rst_info->depc);
os_printf("Flash map %d, chip %08X\n", system_get_flash_size_map(), spi_flash_get_id()); uint32_t fid = spi_flash_get_id();
os_printf("Flash map %s, manuf 0x%02lX chip 0x%04lX\n", flash_maps[system_get_flash_size_map()],
fid & 0xff, (fid&0xff00)|((fid>>16)&0xff));
os_printf("** esp-link ready\n"); os_printf("** esp-link ready\n");

@ -0,0 +1,90 @@
#include <esp8266.h>
#include "cgiwifi.h"
#include "config.h"
#include "mqtt.h"
MQTT_Client mqttClient;
static ETSTimer mqttTimer;
static int once = 0;
static void ICACHE_FLASH_ATTR mqttTimerCb(void *arg) {
if (once++ > 0) return;
MQTT_Init(&mqttClient, flashConfig.mqtt_hostname, flashConfig.mqtt_port, 0, 2,
flashConfig.mqtt_client, flashConfig.mqtt_username, flashConfig.mqtt_password,
60, 1);
MQTT_Connect(&mqttClient);
MQTT_Subscribe(&mqttClient, "system/time", 0);
}
void ICACHE_FLASH_ATTR
wifiStateChangeCb(uint8_t status)
{
if (status == wifiGotIP) {
os_timer_disarm(&mqttTimer);
os_timer_setfn(&mqttTimer, mqttTimerCb, NULL);
os_timer_arm(&mqttTimer, 200, 0);
}
}
// initialize the custom stuff that goes beyond esp-link
void mqtt_client_init() {
wifiAddStateChangeCb(wifiStateChangeCb);
}
#if 0
MQTT_Client mqttClient;
void ICACHE_FLASH_ATTR
mqttConnectedCb(uint32_t *args) {
MQTT_Client* client = (MQTT_Client*)args;
MQTT_Publish(client, "announce/all", "Hello World!", 0, 0);
}
void ICACHE_FLASH_ATTR
mqttDisconnectedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
os_printf("MQTT Disconnected\n");
}
void ICACHE_FLASH_ATTR
mqttTcpDisconnectedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
os_printf("MQTT TCP Disconnected\n");
}
void ICACHE_FLASH_ATTR
mqttPublishedCb(uint32_t *args) {
// MQTT_Client* client = (MQTT_Client*)args;
os_printf("MQTT Published\n");
}
void ICACHE_FLASH_ATTR
mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len) {
char *topicBuf = (char*)os_zalloc(topic_len + 1);
char *dataBuf = (char*)os_zalloc(data_len + 1);
// MQTT_Client* client = (MQTT_Client*)args;
os_memcpy(topicBuf, topic, topic_len);
topicBuf[topic_len] = 0;
os_memcpy(dataBuf, data, data_len);
dataBuf[data_len] = 0;
os_printf("Receive topic: %s, data: %s\n", topicBuf, dataBuf);
os_free(topicBuf);
os_free(dataBuf);
}
MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, MQTT_SECURITY);
MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, MQTT_CLSESSION);
MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0);
MQTT_OnConnected(&mqttClient, mqttConnectedCb);
MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);
MQTT_OnDisconnected(&mqttClient, mqttTcpDisconnectedCb);
MQTT_OnPublished(&mqttClient, mqttPublishedCb);
MQTT_OnData(&mqttClient, mqttDataCb);
#endif

@ -4,7 +4,9 @@
#include "config.h" #include "config.h"
#include "serled.h" #include "serled.h"
#include "cgiwifi.h" #include "cgiwifi.h"
#include "tcpclient.h" #include "mqtt.h"
extern MQTT_Client mqttClient;
//===== "CONN" LED status indication //===== "CONN" LED status indication
@ -66,51 +68,34 @@ void ICACHE_FLASH_ATTR statusWifiUpdate(uint8_t state) {
os_timer_arm(&ledTimer, 500, 0); os_timer_arm(&ledTimer, 500, 0);
} }
//===== RSSI Status update sent to GroveStreams //===== MQTT Status update
// Every minute...
#define MQTT_STATUS_INTERVAL (60*1000)
#define RSSI_INTERVAL (60*1000) static ETSTimer mqttStatusTimer;
static ETSTimer rssiTimer; static int ICACHE_FLASH_ATTR
mqttStatusMsg(char *buf) {
sint8 rssi = wifi_station_get_rssi();
if (rssi > 0) rssi = 0; // not connected or other error
//os_printf("timer rssi=%d\n", rssi);
#define GS_STREAM "rssi" // compose MQTT message
return os_sprintf(buf,
"{\"rssi\":%d, \"heap_free\":%ld}",
rssi, (unsigned long)system_get_free_heap_size());
}
// Timer callback to send an RSSI update to a monitoring system // Timer callback to send an RSSI update to a monitoring system
static void ICACHE_FLASH_ATTR rssiTimerCb(void *v) { static void ICACHE_FLASH_ATTR mqttStatusCb(void *v) {
if (!flashConfig.rssi_enable || !flashConfig.tcp_enable || flashConfig.api_key[0]==0) if (!flashConfig.mqtt_status_enable || os_strlen(flashConfig.mqtt_status_topic) == 0 ||
mqttClient.connState != MQTT_CONNECTED)
return; return;
sint8 rssi = wifi_station_get_rssi(); char buf[128];
os_printf("timer rssi=%d\n", rssi); mqttStatusMsg(buf);
if (rssi >= 0) return; // not connected or other error MQTT_Publish(&mqttClient, flashConfig.mqtt_status_topic, buf, 0, 0);
// compose TCP command
uint8_t chan = MAX_TCP_CHAN-1;
tcpClientCommand(chan, 'T', "grovestreams.com:80");
// compose http header
char buf[1024];
int hdrLen = os_sprintf(buf,
"PUT /api/feed?api_key=%s HTTP/1.0\r\n"
"Content-Type: application/json\r\n"
"Content-Length: XXXXX\r\n\r\n",
flashConfig.api_key);
// http body
int dataLen = os_sprintf(buf+hdrLen,
"[{\"compId\":\"%s\", \"streamId\":\"%s\", \"data\":%d}]\r",
flashConfig.hostname, GS_STREAM, rssi);
buf[hdrLen+dataLen++] = 0;
buf[hdrLen+dataLen++] = '\n';
// hackish way to fill in the content-length
os_sprintf(buf+hdrLen-9, "%5d", dataLen);
buf[hdrLen-4] = '\r'; // fix-up the \0 inserted by sprintf (hack!)
// send the request off and forget about it...
for (short i=0; i<hdrLen+dataLen; i++) {
tcpClientSendChar(chan, buf[i]);
}
tcpClientSendPush(chan);
} }
//===== Init status stuff //===== Init status stuff
@ -126,9 +111,9 @@ void ICACHE_FLASH_ATTR statusInit(void) {
os_timer_setfn(&ledTimer, ledTimerCb, NULL); os_timer_setfn(&ledTimer, ledTimerCb, NULL);
os_timer_arm(&ledTimer, 2000, 0); os_timer_arm(&ledTimer, 2000, 0);
os_timer_disarm(&rssiTimer); os_timer_disarm(&mqttStatusTimer);
os_timer_setfn(&rssiTimer, rssiTimerCb, NULL); os_timer_setfn(&mqttStatusTimer, mqttStatusCb, NULL);
os_timer_arm(&rssiTimer, RSSI_INTERVAL, 1); // recurring timer os_timer_arm(&mqttStatusTimer, MQTT_STATUS_INTERVAL, 1); // recurring timer
} }

@ -41,6 +41,8 @@
#include "pktbuf.h" #include "pktbuf.h"
#include "mqtt.h" #include "mqtt.h"
extern void dumpMem(void *buf, int len);
// HACK // HACK
sint8 espconn_secure_connect(struct espconn *espconn) { sint8 espconn_secure_connect(struct espconn *espconn) {
return espconn_connect(espconn); return espconn_connect(espconn);
@ -148,7 +150,7 @@ mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) {
pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled); pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled);
} }
os_printf("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n", os_printf("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%04X\n",
mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type], pending_msg_id); mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type], pending_msg_id);
switch (msg_type) { switch (msg_type) {
@ -553,8 +555,12 @@ MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint8_t q
return FALSE; return FALSE;
} }
client->mqtt_connection.message_id = msg.message_id; client->mqtt_connection.message_id = msg.message_id;
if (msg.message.data != buf->data)
os_memcpy(buf->data, msg.message.data, msg.message.length);
buf->filled = msg.message.length;
os_printf("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length); os_printf("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length);
dumpMem(buf, buf_len);
client->msgQueue = PktBuf_Push(client->msgQueue, buf); client->msgQueue = PktBuf_Push(client->msgQueue, buf);
if (!client->sending && client->pending_buffer == NULL) { if (!client->sending && client->pending_buffer == NULL) {

Loading…
Cancel
Save