// // MQTT Commands coming in from the attache microcontrollver over the serial port // #include #include "mqtt.h" #include "mqtt_client.h" #include "mqtt_cmd.h" #ifdef MQTTCMD_DBG #define DBG(format, ...) do { os_printf(format, ## __VA_ARGS__); } while(0) #else #define DBG(format, ...) do { } while(0) #endif void ICACHE_FLASH_ATTR cmdMqttConnectedCb(MQTT_Client* client) { MqttCmdCb* cb = (MqttCmdCb*)client->user_data; DBG("MQTT: Connected Cb=%p\n", (void*)cb->connectedCb); cmdResponseStart(CMD_RESP_CB, cb->connectedCb, 0); cmdResponseEnd(); } void ICACHE_FLASH_ATTR cmdMqttDisconnectedCb(MQTT_Client* client) { MqttCmdCb* cb = (MqttCmdCb*)client->user_data; DBG("MQTT: Disconnected cb=%p\n", (void*)cb->disconnectedCb); cmdResponseStart(CMD_RESP_CB, cb->disconnectedCb, 0); cmdResponseEnd(); } void ICACHE_FLASH_ATTR cmdMqttPublishedCb(MQTT_Client* client) { MqttCmdCb* cb = (MqttCmdCb*)client->user_data; DBG("MQTT: Published cb=%p\n", (void*)cb->publishedCb); cmdResponseStart(CMD_RESP_CB, cb->publishedCb, 0); cmdResponseEnd(); } void ICACHE_FLASH_ATTR cmdMqttDataCb(MQTT_Client* client, const char* topic, uint32_t topic_len, const char* data, uint32_t data_len) { MqttCmdCb* cb = (MqttCmdCb*)client->user_data; DBG("MQTT: Data cb=%p topic=%s len=%u\n", (void*)cb->dataCb, topic, data_len); cmdResponseStart(CMD_RESP_CB, cb->dataCb, 2); cmdResponseBody(topic, topic_len); cmdResponseBody(data, data_len); cmdResponseEnd(); } void ICACHE_FLASH_ATTR MQTTCMD_Lwt(CmdPacket *cmd) { CmdRequest req; cmdRequest(&req, cmd); if (cmdGetArgc(&req) != 4) return; MQTT_Client* client = &mqttClient; // free old topic & message if (client->connect_info.will_topic) os_free(client->connect_info.will_topic); if (client->connect_info.will_message) os_free(client->connect_info.will_message); uint16_t len; // get topic len = cmdArgLen(&req); if (len > 128) return; // safety check client->connect_info.will_topic = (char*)os_zalloc(len + 1); cmdPopArg(&req, client->connect_info.will_topic, len); client->connect_info.will_topic[len] = 0; // get message len = cmdArgLen(&req); if (len > 128) return; // safety check client->connect_info.will_message = (char*)os_zalloc(len + 1); cmdPopArg(&req, client->connect_info.will_message, len); client->connect_info.will_message[len] = 0; // get qos cmdPopArg(&req, (uint8_t*)&client->connect_info.will_qos, 4); // get retain cmdPopArg(&req, (uint8_t*)&client->connect_info.will_retain, 4); DBG("MQTT: MQTTCMD_Lwt topic=%s, message=%s, qos=%d, retain=%d\n", client->connect_info.will_topic, client->connect_info.will_message, client->connect_info.will_qos, client->connect_info.will_retain); // trigger a reconnect to set the LWT MQTT_Reconnect(client); } void ICACHE_FLASH_ATTR MQTTCMD_Publish(CmdPacket *cmd) { CmdRequest req; cmdRequest(&req, cmd); if (cmdGetArgc(&req) != 5) return; MQTT_Client* client = &mqttClient; uint16_t len; // get topic len = cmdArgLen(&req); if (len > 128) return; // safety check uint8_t *topic = (uint8_t*)os_zalloc(len + 1); cmdPopArg(&req, topic, len); topic[len] = 0; // get data len = cmdArgLen(&req); uint8_t *data = (uint8_t*)os_zalloc(len+1); if (!data) { // safety check os_free(topic); return; } cmdPopArg(&req, data, len); data[len] = 0; uint16_t data_len; uint8_t qos, retain; // get data length cmdPopArg(&req, &data_len, sizeof(data_len)); // get qos cmdPopArg(&req, &qos, sizeof(qos)); // get retain cmdPopArg(&req, &retain, sizeof(retain)); DBG("MQTT: MQTTCMD_Publish topic=%s, data_len=%d, qos=%d, retain=%d\n", topic, data_len, qos, retain); MQTT_Publish(client, (char*)topic, (char*)data, data_len, qos%3, retain&1); os_free(topic); os_free(data); return; } void ICACHE_FLASH_ATTR MQTTCMD_Subscribe(CmdPacket *cmd) { CmdRequest req; cmdRequest(&req, cmd); if (cmdGetArgc(&req) != 2) return; MQTT_Client* client = &mqttClient; uint16_t len; // get topic len = cmdArgLen(&req); if (len > 128) return; // safety check uint8_t* topic = (uint8_t*)os_zalloc(len + 1); cmdPopArg(&req, topic, len); topic[len] = 0; // get qos uint32_t qos = 0; cmdPopArg(&req, (uint8_t*)&qos, 4); DBG("MQTT: MQTTCMD_Subscribe topic=%s, qos=%u\n", topic, qos); MQTT_Subscribe(client, (char*)topic, (uint8_t)qos); os_free(topic); return; } void ICACHE_FLASH_ATTR MQTTCMD_Setup(CmdPacket *cmd) { CmdRequest req; cmdRequest(&req, cmd); MQTT_Client* client = &mqttClient; if (cmdGetArgc(&req) != 4) return; #if 0 if (cmdGetArgc(&req) != 9) return 0; // create mqtt client uint8_t clientLen = sizeof(MQTT_Client); MQTT_Client* client = (MQTT_Client*)os_zalloc(clientLen); if (client == NULL) return 0; os_memset(client, 0, clientLen); uint16_t len; uint8_t *client_id, *user_data, *pass_data; uint32_t keepalive, clean_session; // get client id len = cmdArgLen(&req); if (len > 32) return 0; // safety check client_id = (uint8_t*)os_zalloc(len + 1); cmdPopArg(&req, client_id, len); client_id[len] = 0; // get username len = cmdArgLen(&req); if (len > 32) return 0; // safety check user_data = (uint8_t*)os_zalloc(len + 1); cmdPopArg(&req, user_data, len); user_data[len] = 0; // get password len = cmdArgLen(&req); if (len > 32) return 0; // safety check pass_data = (uint8_t*)os_zalloc(len + 1); cmdPopArg(&req, pass_data, len); pass_data[len] = 0; // get keepalive cmdPopArg(&req, (uint8_t*)&keepalive, 4); // get clean session cmdPopArg(&req, (uint8_t*)&clean_session, 4); #ifdef MQTTCMD_DBG DBG("MQTT: MQTTCMD_Setup clientid=%s, user=%s, pw=%s, keepalive=%ld, clean_session=%ld\n", client_id, user_data, pass_data, keepalive, clean_session); #endif // init client // TODO: why malloc these all here, pass to MQTT_InitClient to be malloc'd again? MQTT_InitClient(client, (char*)client_id, (char*)user_data, (char*)pass_data, keepalive, clean_session); os_free(client_id); os_free(user_data); os_free(pass_data); #endif // create callback MqttCmdCb* callback = (MqttCmdCb*)os_zalloc(sizeof(MqttCmdCb)); cmdPopArg(&req, &callback->connectedCb, 4); cmdPopArg(&req, &callback->disconnectedCb, 4); cmdPopArg(&req, &callback->publishedCb, 4); cmdPopArg(&req, &callback->dataCb, 4); client->user_data = callback; DBG("MQTT connectedCb=%x\n", callback->connectedCb); client->cmdConnectedCb = cmdMqttConnectedCb; client->cmdDisconnectedCb = cmdMqttDisconnectedCb; client->cmdPublishedCb = cmdMqttPublishedCb; client->cmdDataCb = cmdMqttDataCb; if (client->connState == MQTT_CONNECTED) { if (callback->connectedCb) cmdMqttConnectedCb(client); } else if (callback->disconnectedCb) { cmdMqttDisconnectedCb(client); } } #if 0 uint32_t ICACHE_FLASH_ATTR MQTTCMD_Connect(CmdPacket *cmd) { CmdRequest req; cmdRequest(&req, cmd); #ifdef MQTT_1_CLIENT if (mqttClient.connState == MQTT_CONNECTED && mqttClient.cmdConnectedCb) { mqttClient.cmdConnectedCb((uint32_t*)&mqttClient); } else if (mqttClient.connState == MQTT_DISCONNECTED && mqttClient.cmdDisconnectedCb) { mqttClient.cmdDisconnectedCb((uint32_t*)&mqttClient); } return 1; #else if (cmdGetArgc(&req) != 4) return 0; // get mqtt client uint32_t client_ptr; cmdPopArg(&req, (uint8_t*)&client_ptr, 4); MQTT_Client* client = (MQTT_Client*)client_ptr; DBG("MQTT: MQTTCMD_Connect client ptr=%p\n", (void*)client_ptr); uint16_t len; // get host if (client->host) os_free(client->host); len = cmdArgLen(&req); if (len > 128) return 0; // safety check client->host = (char*)os_zalloc(len + 1); cmdPopArg(&req, client->host, len); client->host[len] = 0; // get port cmdPopArg(&req, (uint8_t*)&client->port, 4); // get security cmdPopArg(&req, (uint8_t*)&client->security, 4); DBG("MQTT: MQTTCMD_Connect host=%s, port=%d, security=%d\n", client->host, client->port, client->security); MQTT_Connect(client); return 1; #endif } uint32_t ICACHE_FLASH_ATTR MQTTCMD_Disconnect(CmdPacket *cmd) { CmdRequest req; cmdRequest(&req, cmd); #ifdef MQTT_1_CLIENT return 1; #else if (cmdGetArgc(&req) != 1) return 0; // get mqtt client uint32_t client_ptr; cmdPopArg(&req, (uint8_t*)&client_ptr, 4); MQTT_Client* client = (MQTT_Client*)client_ptr; DBG("MQTT: MQTTCMD_Disconnect client ptr=%p\n", (void*)client_ptr); // disconnect MQTT_Disconnect(client); return 1; #endif } #endif