2023年8月19日 星期六

ESP32 MQTT Publish/Subscribe DS18B20 Temperature to Node-Red

Purpose:
利用ESP32偵測DS18B20的溫度以及接台灣環保署的天氣資訊將其訊息整合並利用公用MQTT broker的發布與訂閱Switch ON/OFF 來做LED燈號切換.
This Project use ESP32 to detect the temperature of DS18B20 and receive the weather information from the Taiwan Environmental Protection Agency to integrate the information and use the public MQTT broker to publish and subscribe and Switch ON/OFF to switch the LED light.

Fundamental:
MQTT(Message Queuing Telemetry Transport)訊息佇列遙測傳輸
MQTT是一種基於「發布(Publish)∕訂閱(Subsribe)」機制的訊息傳輸協定 (MQTT is a Client Server publish/subscribe messaging transport protocol), 我們可以把它想成在公佈欄上張貼發行訂閱的機制. MQTT訊息發送端, 相當於把訊息交給公佈欄的代理人(broker), 來統籌管理發行和訂閱事宜. 每一個訊息來源都有個唯一的主題名稱.
MQTT (Message Queuing Telemetry Transport) message queue telemetry transmission
MQTT is a message transfer protocol based on the "Publish (Publish)/Subscribe (Subscribe)" mechanism (MQTT is a Client Server publish/subscribe messaging transport protocol), we can think of it as publishing and subscribing on the bulletin board Mechanism. The MQTT message sender is equivalent to handing the message to the broker (broker) of the bulletin board to coordinate and manage the distribution and subscription matters. Each message source has a unique topic name.
代理人是個伺服器軟體, 向伺服器發送主題的一方是發布者(publisher), 從伺服器獲取主題的一方則是訂閱者(subscriber).

MQTT的標頭採用數字編碼,整個長度只佔2位元組,等同兩個字元,後面跟著訊息的主題(topic)和內容(payload).
MQTT Explorer
測試有沒有成功連線到 MQTT Broker. 
我們先到 MQTT Explorer去下載工具,連結如下。
http://mqtt-explorer.com/
host 設定 mqtt.eclipseprojects.io 要跟ESP32 code 一樣
Node-RED
Node-RED is a programming tool for wiring together hardware devices, APIs and online services in new and interesting ways. It provides a browser-based editor that makes it easy to wire together flows using the wide range of nodes in the palette that can be deployed to its runtime in a single-click.
Node-RED 是 IBM 以 Node.js 為基礎, 開發出來的視覺化 IOT 開發工具, 透過「流程圖」方式的操作,透過 Node-RED 完成許多後端才能做的事情.
Node-RED is a visual IOT development tool developed by IBM based on Node.js. Through the operation of "flow chart", Node-RED can accomplish many things that can only be done in the back end.
YouTube Demo:
Code  Introduction:
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <OneWire.h>
#include <DallasTemperature.h>
#include <BluetoothSerial.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>

#define NTP_SERVER     "pool.ntp.org"
#define UTC_OFFSET     0
#define UTC_OFFSET_DST 0
//-----   DS18B20  ------------------
#define DQ_Pin 4
OneWire oneWire(DQ_Pin);
DallasTemperature sensors(&oneWire);

byte data[12]; // buffer for data
byte address[8]; // 64 bit device address
float Temperature;
// ------ 以下修改成你自己的WiFi帳號密碼 ------
char* ssid = "your ssid";
char* password = "your password";
//----PM2.5--------
String AQIBuffer;

//申請API key 環保署: https://data.epa.gov.tw/api-term
//查看空氣品質列表:https://data.epa.gov.tw/api/v2/aqx_p_432?offset=0&format=json&api_key=你的APIkey
String APIkey = "your API Key";
//String Area = "橋頭"; //希望取得空氣品質的地點
String Area = "新竹"; //希望取得空氣品質的地點
String url = "https://data.epa.gov.tw/api/v2/aqx_p_432?format=json&limit=5&api_key="
+ APIkey + "&filters=SiteName,EQ," + Area ; //PM2.5的網址
// ------ 以下修改成你MQTT設定 ------
char* MQTTServer = "mqtt.eclipseprojects.io";//免註冊MQTT伺服器
//char* MQTTServer = "test.mosquitto.org";//免註冊MQTT伺服器
int MQTTPort = 1883;//MQTT Port
char* MQTTUser = "";//
char* MQTTPassword = "";//
//推播主題1:推播溫度
char* MQTTPubTopic1 = "winsondiy/ESP32/temp";
//推播主題2:推播濕度
char* MQTTPubTopic2 = "winsondiy/ESP32/pm2.5_avg";
//訂閱主題1:改變LED燈號
char* MQTTSubTopic1 = "winsondiy/ESP32/led";
long MQTTLastPublishTime;//此變數用來記錄推播時間
long MQTTPublishInterval = 5000;//每10秒推撥一次

WiFiClient WifiClient;
PubSubClient MQTTClient(WifiClient);

unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
//--------- Flag structure --------------------------------------
typedef struct _vFlag
{
  uint8_t BTFlag = 0;
  uint8_t DC_Flag = 0;
  uint8_t CANFlag = 0;
  uint8_t I2C_Flag = 0;
  uint8_t RFIDWrite = 0;
  uint8_t RFIDRead = 0;
  uint8_t dht22 = 0;
  uint8_t sensor1_Flag = 0;
  uint8_t initial_Flag = 0;
  uint8_t FunctionFlag = 0;
  uint8_t DS18B20Flag = 0;
} vFlag;
vFlag *flag_Ptr;
vFlag flag;
//--------- uart structure --------------------------------------
//----------uart--------------
#define LINE_BUFFER_LENGTH 64
typedef struct _vUart
{
  char c;
  int lineIndex = 0;
  int line1Index = 0;
  int BTlineIndex = 0;
  bool lineIsComment;
  bool lineSemiColon;
  char line[128];
  char BTline[20];
  String inputString;
  String BTinputString;
  String S1inputString;
  int V[16];
  char ctemp[30];
  char I2C_Data[80];
  int DC_Spped = 50;
  float Voltage[16];
  int Buffer[128];
  int StartCnt = 0;
  int ReadCnt = 0;
  int sensorValue = 0;
} vUart;
vUart *Uart_Ptr;
vUart Uart;

//---------------------------------------------------------------------------------
#ifndef LED_BUILTIN
#define LED_BUILTIN 2
#endif
//----------------------------------------------------------------
TaskHandle_t hled;
TaskHandle_t huart;
//------------------------------------------------------------------------------
void initial()
{
  Serial.println(F("Create Task"));
  //----------------------------------------------------------------------
  xTaskCreatePinnedToCore(
    vUARTTask, "UARTTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 3 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &huart //handle
    ,
    0);

  //--------------- create task----------------------------------
  xTaskCreatePinnedToCore(
    vLEDTask, "LEDTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 2 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &hled //handle
    ,
    0);
  //----------------------------------------
  //----------------------------------------------------------------------
  //vTaskSuspend(hfunction); //暫停TASK運行
  //----------------------------------------------------------------------
}

void setup()
{
  Serial.begin(9600);
  Serial.println(F("init"));
  initial();
  pinMode(LED_BUILTIN, OUTPUT);
  pinMode(16, OUTPUT);
  //-------------------------
  //----------------------------
  Serial.println(ssid);

  WifiConnect();
  //-----------------------------------------

  //------------------------------------------
  configTime(8*3600, 0, "pool.ntp.org","time.nist.gov"); // enable NTP for Taipei time
  //configTime(UTC_OFFSET, UTC_OFFSET_DST, NTP_SERVER);
  //---------------------------------------------
  if (oneWire.search(address))
  {
    Serial.println("Slave device found!");
    Serial.print("Device Address = ");
    Serial.println(address[0]);
  }
  else
  {
    Serial.println("Slave device not found!");
  }
  //-----DS18B20-----------
  sensors.begin();

}

void loop()
{
  Serial.print(F("Main at core:"));
  Serial.println(xPortGetCoreID());
  while(1)
  {
    if (WiFi.status() != WL_CONNECTED)
    {
      WifiConnect();
    }
   
    if (!MQTTClient.connected())
    {
      MQTTConnect();
    }

    if ((millis() - MQTTLastPublishTime) >= MQTTPublishInterval )
    {
      //---ds18b20-----
      sensors.requestTemperatures();
      Temperature=sensors.getTempCByIndex(0);
      Serial.println(sensors.getTempCByIndex(0));
      MQTTClient.publish(MQTTPubTopic1, String(Temperature).c_str());
      Serial.println("Temperature Publish to MQTT Broker");
      //---------------------------
     
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      MQTTClient.publish(MQTTPubTopic2, String(AQIBuffer).c_str());
      Serial.println("pm2.5_avg Publish to MQTT Broker");
     
      MQTTLastPublishTime = millis();
    }
    MQTTClient.loop();//update status
    delay(50);
    //----------------------------
    if(flag.DS18B20Flag == 2)
    {
      vNodeRedTask();
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      flag.DS18B20Flag =0;
    }
   
   
  }
}
//-------------------------------------------
void vUARTTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.print(F("UARTTask at core:"));
  Serial.println(xPortGetCoreID());
  vTaskDelay(100);
  for (;;)
  {
    while (Serial.available() > 0)
    {
      Uart.c = Serial.read();
 
      if ((Uart.c == '\n') || (Uart.c == '\r'))
      { // End of line reached
        if (Uart.lineIndex > 0)
        { // Line is complete. Then execute!
          Uart.line[Uart.lineIndex] = '\0'; // Terminate string
          //Serial.println( F("Debug") );
          //Serial.println( Uart.inputString );
          processCommand(Uart.line); // do something with the command
          Uart.lineIndex = 0;
          Uart.inputString = "";
        }
        else
        {
          // Empty or comment line. Skip block.
        }
        Uart.lineIsComment = false;
        Uart.lineSemiColon = false;
        Serial.println(F("ok>"));
      }
      else
      {
        //Serial.println( c );
        if ((Uart.lineIsComment) || (Uart.lineSemiColon))
        {
          if (Uart.c == ')')
            Uart.lineIsComment = false; // End of comment. Resume line.
        }
        else
        {
          if (Uart.c == '/')
          { // Block delete not supported. Ignore character.
          }
          else if (Uart.c == '~')
          { // Enable comments flag and ignore all characters until ')' or EOL.
            Uart.lineIsComment = true;
          }
          else if (Uart.c == ';')
          {
            Uart.lineSemiColon = true;
          }
          else if (Uart.lineIndex >= LINE_BUFFER_LENGTH - 1)
          {
            Serial.println("ERROR - lineBuffer overflow");
            Uart.lineIsComment = false;
            Uart.lineSemiColon = false;
          }
          else if (Uart.c >= 'a' && Uart.c <= 'z')
          { // Upcase lowercase
            Uart.line[Uart.lineIndex] = Uart.c - 'a' + 'A';
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += (char)(Uart.c - 'a' + 'A');
          }
          else
          {
            Uart.line[Uart.lineIndex] = Uart.c;
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += Uart.c;
          }
        }
      }
    } //while (Serial.available() > 0)
    vTaskDelay(5);
  }
}
//-------------------------------------------------------------------------
static void vLEDTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.println(F("LEDTask at core:"));
  Serial.println(xPortGetCoreID());
  pinMode(LED_BUILTIN, OUTPUT);
  for (;;) // A Task shall never return or exit.
  {
    digitalWrite(LED_BUILTIN, HIGH); // turn the LED on (HIGH is the voltage level)
    vTaskDelay(200);
    digitalWrite(LED_BUILTIN, LOW); // turn the LED off by making the voltage LOW
    vTaskDelay(200);
  }
}
//----------------------------------------
void processCommand(char *data)
{
  int len, xlen, ylen, zlen, alen;
  int tempDIO;
  String stemp;

  len = Uart.inputString.length();
  //---------------------------------------
  if (strstr(data, "VER") != NULL)
  {
    Serial.println(F("ESP32_20230811"));
  }
  if (strstr(data, "DS18B20_ON") != NULL)
  {
    flag.DS18B20Flag = 1;
    Serial.println(F("DS18B20_ON"));

  }
  if (strstr(data, "NODE_RED") != NULL)
  {
    flag.DS18B20Flag = 2;
    Serial.println(F("NODE_RED"));

  }
  if (strstr(data, "DS18B20_OFF") != NULL)
  {
    flag.DS18B20Flag = 0;
    Serial.println(F("DS18B20_OFF"));

  }
 
}
//-----------------------------------------

//--------------------------------------------
void WifiConnect()
{

  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED)
  {
    delay(500);
    Serial.print(".");
    spinner();
  }
  Serial.println("WiFi連線成功");
  Serial.print("IP Address:");
  Serial.println(WiFi.localIP());
}
//----------------------------------------
void MQTTConnect()
{
  MQTTClient.setServer(MQTTServer, MQTTPort);
  MQTTClient.setCallback(MQTTCallback);
  while (!MQTTClient.connected())
  {
    //以亂數為ClietID
    String MQTTClientid = "esp32-" + String(random(1000000, 9999999));
    if (MQTTClient.connect(MQTTClientid.c_str(), MQTTUser, MQTTPassword))
    {
      Serial.println("MQTT已連線");
      MQTTClient.subscribe(MQTTSubTopic1);
    }
    else
    {
      Serial.print("MQTT連線失敗,狀態碼=");
      Serial.println(MQTTClient.state());
      Serial.println("五秒後重新連線");
      delay(5000);
    }
  }
}
//------------------------------------------
void MQTTCallback(char* topic, byte* payload, unsigned int length)
{
  Serial.print(topic); Serial.print("訂閱通知:");
  String payloadString;
  for (int i = 0; i < length; i++)
  {
    payloadString = payloadString + (char)payload[i];
  }
  Serial.println(payloadString);
  if (strcmp(topic, MQTTSubTopic1) == 0)
  {
    Serial.println("改變燈號:" + payloadString);
    if (payloadString == "ON")
    {
      digitalWrite(16, HIGH);
    }
    if (payloadString == "OFF")
    {
      digitalWrite(16, LOW);
    }
  }
}
//-------------------------------------------
void vNodeRedTask()
{
  //Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}
//-------------------------------------------
void vDS18B20Task()
{
  Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}

String httpGETRequest1(const char* serverName)
{
  HTTPClient http;
  //String AQI;
 
  // Your IP address with path or Domain name with URL path
  http.begin(serverName);
 
  // Send HTTP POST request
  int httpResponseCode = http.GET();
 
  String payload = "{}";
 
  //if (httpResponseCode > 0) {
  if (httpResponseCode == HTTP_CODE_OK) {        

    payload = http.getString();

    DynamicJsonDocument AQJarray(payload.length() * 2);
    deserializeJson(AQJarray, payload);
    String AQI = AQJarray["records"][0]["pm2.5_avg"];
    AQIBuffer = AQI;
    //-------------------------
   
  }
  else {
    Serial.print("Error code: ");
    Serial.println(httpResponseCode);
  }
  http.end();

  return AQIBuffer;
}
//-----------------------------------



沒有留言:

張貼留言