ensure reconnect rexmits pending messages

pull/47/head
Thorsten von Eicken 9 years ago
parent 19693a11ec
commit 215da43a44
  1. 3
      esp-link/mqtt_client.c
  2. 4
      esp-link/status.c
  3. 8
      mqtt/mqtt.c
  4. 2
      mqtt/mqtt.h
  5. 28
      user/user_main.c

@ -11,8 +11,7 @@ static int once = 0;
static void ICACHE_FLASH_ATTR mqttTimerCb(void *arg) { static void ICACHE_FLASH_ATTR mqttTimerCb(void *arg) {
if (once++ > 0) return; if (once++ > 0) return;
MQTT_Init(&mqttClient, flashConfig.mqtt_hostname, flashConfig.mqtt_port, 0, 2, MQTT_Init(&mqttClient, flashConfig.mqtt_hostname, flashConfig.mqtt_port, 0, 2,
flashConfig.mqtt_client, flashConfig.mqtt_username, flashConfig.mqtt_password, flashConfig.mqtt_client, flashConfig.mqtt_username, flashConfig.mqtt_password, 60);
60, 1);
MQTT_Connect(&mqttClient); MQTT_Connect(&mqttClient);
MQTT_Subscribe(&mqttClient, "system/time", 0); MQTT_Subscribe(&mqttClient, "system/time", 0);
} }

@ -95,7 +95,9 @@ static void ICACHE_FLASH_ATTR mqttStatusCb(void *v) {
char buf[128]; char buf[128];
mqttStatusMsg(buf); mqttStatusMsg(buf);
MQTT_Publish(&mqttClient, flashConfig.mqtt_status_topic, buf, 0, 0); MQTT_Publish(&mqttClient, flashConfig.mqtt_status_topic, buf, 1, 0);
//espconn_disconnect(mqttClient.pCon);
} }
//===== Init status stuff //===== Init status stuff

@ -325,6 +325,7 @@ mqtt_timer(void* arg) {
client->msgQueue = PktBuf_Unshift(client->msgQueue, client->pending_buffer); client->msgQueue = PktBuf_Unshift(client->msgQueue, client->pending_buffer);
client->pending_buffer = NULL; client->pending_buffer = NULL;
} }
client->connect_info.clean_session = 0; // ask server to keep state
MQTT_Connect(client); MQTT_Connect(client);
} }
} }
@ -607,7 +608,7 @@ MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) {
void ICACHE_FLASH_ATTR void ICACHE_FLASH_ATTR
MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, uint8_t sendTimeout, MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, uint8_t sendTimeout,
char* client_id, char* client_user, char* client_pass, char* client_id, char* client_user, char* client_pass,
uint8_t keepAliveTime, uint8_t cleanSession) { uint8_t keepAliveTime) {
os_printf("MQTT_Init\n"); os_printf("MQTT_Init\n");
os_memset(mqttClient, 0, sizeof(MQTT_Client)); os_memset(mqttClient, 0, sizeof(MQTT_Client));
@ -633,7 +634,7 @@ MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, uint8_t security, ui
os_strcpy(mqttClient->connect_info.password, client_pass); os_strcpy(mqttClient->connect_info.password, client_pass);
mqttClient->connect_info.keepalive = keepAliveTime; mqttClient->connect_info.keepalive = keepAliveTime;
mqttClient->connect_info.clean_session = cleanSession; mqttClient->connect_info.clean_session = 1;
mqttClient->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE); mqttClient->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE);
mqttClient->in_buffer_size = MQTT_MAX_RCV_MESSAGE; mqttClient->in_buffer_size = MQTT_MAX_RCV_MESSAGE;
@ -708,7 +709,6 @@ MQTT_Connect(MQTT_Client* mqttClient) {
mqttClient->connState = TCP_CONNECTING; mqttClient->connState = TCP_CONNECTING;
mqttClient->timeoutTick = 20; // generous timeout to allow for DNS, etc mqttClient->timeoutTick = 20; // generous timeout to allow for DNS, etc
mqttClient->sending = FALSE; mqttClient->sending = FALSE;
mqttClient->msgQueue = NULL;
} }
static void ICACHE_FLASH_ATTR static void ICACHE_FLASH_ATTR
@ -741,6 +741,8 @@ MQTT_Disconnect(MQTT_Client* mqttClient) {
return; return;
} }
mqtt_doAbort(mqttClient); mqtt_doAbort(mqttClient);
//void *out_buffer = mqttClient->mqtt_connection.buffer;
//if (out_buffer != NULL) os_free(out_buffer);
mqttClient->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect mqttClient->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect
} }

@ -96,7 +96,7 @@ typedef struct {
void MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port, void MQTT_Init(MQTT_Client* mqttClient, char* host, uint32 port,
uint8_t security, uint8_t sendTimeout, uint8_t security, uint8_t sendTimeout,
char* client_id, char* client_user, char* client_pass, char* client_id, char* client_user, char* client_pass,
uint8_t keepAliveTime, uint8_t cleanSession); uint8_t keepAliveTime);
// Set Last Will Topic on client, must be called before MQTT_InitConnection // Set Last Will Topic on client, must be called before MQTT_InitConnection
void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg, void MQTT_InitLWT(MQTT_Client* mqttClient, char* will_topic, char* will_msg,

@ -2,36 +2,12 @@
#include "cgiwifi.h" #include "cgiwifi.h"
#include "mqtt.h" #include "mqtt.h"
#if 0
MQTT_Client mqttClient;
static ETSTimer mqttTimer;
static int once = 0;
static void ICACHE_FLASH_ATTR mqttTimerCb(void *arg) {
if (once++ > 0) return;
MQTT_Init(&mqttClient, "h.voneicken.com", 1883, 0, 2, "test1", "", "", 10, 1);
MQTT_Connect(&mqttClient);
MQTT_Subscribe(&mqttClient, "system/time", 0);
}
void ICACHE_FLASH_ATTR
wifiStateChangeCb(uint8_t status)
{
if (status == wifiGotIP) {
os_timer_disarm(&mqttTimer);
os_timer_setfn(&mqttTimer, mqttTimerCb, NULL);
os_timer_arm(&mqttTimer, 15000, 0);
}
}
// initialize the custom stuff that goes beyond esp-link // initialize the custom stuff that goes beyond esp-link
void app_init() { void app_init() {
wifiAddStateChangeCb(wifiStateChangeCb);
} }
#if 0
MQTT_Client mqttClient; MQTT_Client mqttClient;

Loading…
Cancel
Save