2023年8月23日 星期三

Raspberry Pi self-hosted MQTT environment for Internet of Thing

Purpose:

In this Project I will show how I have self-hosted an MQTT environment with the help of a Mosquitto broker running on a Raspberry Pi and let ESP32 microcontrollers publish sensor data and subscribe to control LED through the broker.  these data will be persisted in a Node-RED UI. (Based on the ESP32 MQTT Publish/Subscribe DS18B20 Temperature to Node-Red Video on my channel) and use Cpolar enables public network users to access intranet services normally.

Fundamental :

MQTT (MQ Telemetry Protocol) is a lightweight machine-to-machine communication protocol that works on top of TCP/IP and it is very much suited for usage in an Internet-of-Things scenario. MQTT uses the publish-subscribe pattern where the clients subscribe to topics and publish messages through a broker. There are many different framework implementations of MQTT as well as complete cloud services that use MQTT (Adafruit IO, CloudMQTT, ThingMQ etc). The cloud services are nice and easy to use, but if you don’t want your data uploaded to a third party storage, you can host an MQTT environment yourself and have full control of the gathered data. Even if you decide to upload data to the cloud, the local MQTT environment can work as a gateway where data goes out to the Internet only from one single place on the Intranet. This makes it easier to control and change the outgoing data flow.

Installing the Mosquitto broker on Raspberry Pi:
1. sudo apt-get update
2. sudo apt-get upgrade
3. sudo apt-get install mosquitto mosquitto-clients
To conduct a small communication test, start the subscriber and publisher client applications by opening two separate consoles in our Raspberry Pi and typing the following commands:
Console 1: mosquitto_sub -h localhost -t /test/topic
Console 2: mosquitto_pub -h localhost -t /test/topic -m “Hello”
Node-RED is a programming tool for wiring together hardware devices, APIs and online services in new and interesting way. It provides a browser-based editor that makes it easy to wire together flows using the wide range of nodes in the palette that can be deployed to its runtime in a single-click.

Installing the Node-RED on Raspberry Pi:
This command will install several things to our Raspberry Pi. These packages include Node.js, npm, and Node-RED itself.
1. bash <(curl -sL https://raw.githubusercontent.com/node-red/linux-installers/master/deb/update-nodejs-and-nodered)
2. Are you really sure you want to do this ? [y/N] ? Y
3. Would you like to install the Pi-specific nodes ? [y/N] ? Y

cpolar is a secure intranet penetration cloud service, which exposes the local server under the intranet to the public network through a secure tunnel. This enables public network users to access intranet services normally.
Intranet penetration, simply put, means that the data of the intranet can be obtained by the external network and can be mapped to the public network, so that the data of the intranet can be accessed on the public network.
Installing the Cpolar on Raspberry Pi:
1. curl -sL https://git.io/cpolar | sudo bash
2. ./cpolar version
3. https://dashboard.cpolar.com/signup to register
4. ./cpolar authtoken xxxxxxxxxxxxxxxxxx
5. ./cpolar mosquitto 1883

Raspberry Pi is a popular embedded system board that is relatively small in size and easy to use. Its applications range from hobbyist to professional work areas.

YouTube Demo:
Code Introduce:
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <OneWire.h>
#include <DallasTemperature.h>
#include <BluetoothSerial.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>
//----------------------------------------------------------------------
#define NTP_SERVER     "pool.ntp.org"
#define UTC_OFFSET     0
#define UTC_OFFSET_DST 0
//-----   DS18B20  ------------------
#define DQ_Pin 4
OneWire oneWire(DQ_Pin);
DallasTemperature sensors(&oneWire);

byte data[12]; // buffer for data
byte address[8]; // 64 bit device address
//float Humidity;
float Temperature;
// ------ 以下修改成你自己的WiFi帳號密碼 ------
char* ssid = "Winson_Y52";
char* password = "8888888888";
//----PM2.5--------
String AQIBuffer;
//申請API key 環保署: https://data.epa.gov.tw/api-term
//查看空氣品質列表:https://data.epa.gov.tw/api/v2/aqx_p_432?offset=0&format=json&api_key=你的APIkey
String APIkey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; //你的環保署網站 API Key
//String Area = "橋頭"; //希望取得空氣品質的地點
String Area = "新竹"; //希望取得空氣品質的地點
String url = "https://data.epa.gov.tw/api/v2/aqx_p_432?format=json&limit=5&api_key=" + APIkey + "&filters=SiteName,EQ," + Area ; //PM2.5的網址
// ------ 以下修改成你MQTT設定 ------
//char* MQTTServer = "mqtt.eclipseprojects.io";//免註冊MQTT伺服器
//char* MQTTServer = "test.mosquitto.org";//免註冊MQTT伺服器
char* MQTTServer = "1.tcp.cpolar.io";
int MQTTPort = 10521;//MQTT Port
char* MQTTUser = "";//
char* MQTTPassword = "";//
//推播主題1:推播溫度
char* MQTTPubTopic1 = "winsondiy/ESP32/temp";
//推播主題2:推播濕度
char* MQTTPubTopic2 = "winsondiy/ESP32/pm2.5_avg";
//訂閱主題1:改變LED燈號
char* MQTTSubTopic1 = "winsondiy/ESP32/led";
long MQTTLastPublishTime;//此變數用來記錄推播時間
long MQTTPublishInterval = 5000;//每5秒推撥一次

WiFiClient WifiClient;
PubSubClient MQTTClient(WifiClient);

unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
//--------- Flag structure --------------------------------------
typedef struct _vFlag
{
  uint8_t BTFlag = 0;
  uint8_t DC_Flag = 0;
  uint8_t CANFlag = 0;
  uint8_t I2C_Flag = 0;
  uint8_t RFIDWrite = 0;
  uint8_t RFIDRead = 0;
  uint8_t dht22 = 0;
  uint8_t sensor1_Flag = 0;
  uint8_t initial_Flag = 0;
  uint8_t FunctionFlag = 0;
  uint8_t DS18B20Flag = 0;
} vFlag;
vFlag *flag_Ptr;
vFlag flag;
//--------- uart structure --------------------------------------
//----------uart--------------
#define LINE_BUFFER_LENGTH 64
typedef struct _vUart
{
  char c;
  int lineIndex = 0;
  int line1Index = 0;
  int BTlineIndex = 0;
  bool lineIsComment;
  bool lineSemiColon;
  char line[128];
  char BTline[20];
  String inputString;
  String BTinputString;
  String S1inputString;
  int V[16];
  char ctemp[30];
  char I2C_Data[80];
  int DC_Spped = 50;
  float Voltage[16];
  int Buffer[128];
  int StartCnt = 0;
  int ReadCnt = 0;
  int sensorValue = 0;
} vUart;
vUart *Uart_Ptr;
vUart Uart;

//---------------------------------------------------------------------------------
#ifndef LED_BUILTIN
#define LED_BUILTIN 2
#endif
//----------------------------------------------------------------
TaskHandle_t hled;
TaskHandle_t huart;
//------------------------------------------------------------------------------
void initial()
{
  Serial.println(F("Create Task"));
  //----------------------------------------------------------------------
  xTaskCreatePinnedToCore(
    vUARTTask, "UARTTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 3 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &huart //handle
    ,
    0);

  //--------------- create task----------------------------------
  xTaskCreatePinnedToCore(
    vLEDTask, "LEDTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 2 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &hled //handle
    ,
    0);
  //----------------------------------------
  //----------------------------------------------------------------------
  //vTaskSuspend(hfunction); //暫停TASK運行
  //----------------------------------------------------------------------
}

void setup()
{
  Serial.begin(9600);
  Serial.println(F("init"));
  initial();
  pinMode(LED_BUILTIN, OUTPUT);
  pinMode(16, OUTPUT);
  //-------------------------
  //----------------------------
  Serial.println(ssid);
  //WiFi.mode(WIFI_STA);
  //WiFi.begin(ssid, password);
  //WiFi.begin("Wokwi-GUEST", "", 6);
  WifiConnect();
  //-----------------------------------------
  //MQTTClient.setServer(MQTTServer, MQTTPort);
  //MQTTClient.setCallback(MQTTCallback);
  //------------------------------------------
 
  //---------------------------------------------
  if (oneWire.search(address))
  {
    Serial.println("Slave device found!");
    Serial.print("Device Address = ");
    Serial.println(address[0]);
  }
  else
  {
    Serial.println("Slave device not found!");
  }
  //-----DS18B20-----------
  sensors.begin();
  //dht.begin();
}

void loop()
{
  Serial.print(F("Main at core:"));
  Serial.println(xPortGetCoreID());
  while(1)
  {
    if (WiFi.status() != WL_CONNECTED)
    {
      WifiConnect();
    }
   
    if (!MQTTClient.connected())
    {
      MQTTConnect();
    }
    //如果距離上次傳輸已經超過10秒,則Publish溫度
    if ((millis() - MQTTLastPublishTime) >= MQTTPublishInterval )
    {
      //---ds18b20-----
      sensors.requestTemperatures();
      Temperature=sensors.getTempCByIndex(0);
      Serial.println(sensors.getTempCByIndex(0));
      MQTTClient.publish(MQTTPubTopic1, String(Temperature).c_str());
      Serial.println("Temperature Publish to MQTT Broker");
      //---------------------------
     
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      MQTTClient.publish(MQTTPubTopic2, String(AQIBuffer).c_str());
      Serial.println("pm2.5_avg Publish to MQTT Broker");
     
      MQTTLastPublishTime = millis();
    }
    MQTTClient.loop();//update status
    delay(50);
    //----------------------------
    if(flag.DS18B20Flag == 1)
    {
      vDS18B20Task();
    }
    if(flag.DS18B20Flag == 2)
    {
      vNodeRedTask();
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      flag.DS18B20Flag =0;
    }
   
   
  }
}
//-------------------------------------------
void vUARTTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.print(F("UARTTask at core:"));
  Serial.println(xPortGetCoreID());
  vTaskDelay(100);
  for (;;)
  {
    while (Serial.available() > 0)
    {
      Uart.c = Serial.read();
 
      if ((Uart.c == '\n') || (Uart.c == '\r'))
      { // End of line reached
        if (Uart.lineIndex > 0)
        { // Line is complete. Then execute!
          Uart.line[Uart.lineIndex] = '\0'; // Terminate string
          //Serial.println( F("Debug") );
          //Serial.println( Uart.inputString );
          processCommand(Uart.line); // do something with the command
          Uart.lineIndex = 0;
          Uart.inputString = "";
        }
        else
        {
          // Empty or comment line. Skip block.
        }
        Uart.lineIsComment = false;
        Uart.lineSemiColon = false;
        Serial.println(F("ok>"));
      }
      else
      {
        //Serial.println( c );
        if ((Uart.lineIsComment) || (Uart.lineSemiColon))
        {
          if (Uart.c == ')')
            Uart.lineIsComment = false; // End of comment. Resume line.
        }
        else
        {
          if (Uart.c == '/')
          { // Block delete not supported. Ignore character.
          }
          else if (Uart.c == '~')
          { // Enable comments flag and ignore all characters until ')' or EOL.
            Uart.lineIsComment = true;
          }
          else if (Uart.c == ';')
          {
            Uart.lineSemiColon = true;
          }
          else if (Uart.lineIndex >= LINE_BUFFER_LENGTH - 1)
          {
            Serial.println("ERROR - lineBuffer overflow");
            Uart.lineIsComment = false;
            Uart.lineSemiColon = false;
          }
          else if (Uart.c >= 'a' && Uart.c <= 'z')
          { // Upcase lowercase
            Uart.line[Uart.lineIndex] = Uart.c - 'a' + 'A';
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += (char)(Uart.c - 'a' + 'A');
          }
          else
          {
            Uart.line[Uart.lineIndex] = Uart.c;
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += Uart.c;
          }
        }
      }
    } //while (Serial.available() > 0)
    vTaskDelay(5);
  }
}
//-------------------------------------------------------------------------
static void vLEDTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.println(F("LEDTask at core:"));
  Serial.println(xPortGetCoreID());
  pinMode(LED_BUILTIN, OUTPUT);
  for (;;) // A Task shall never return or exit.
  {
    digitalWrite(LED_BUILTIN, HIGH); // turn the LED on (HIGH is the voltage level)
    vTaskDelay(200);
    digitalWrite(LED_BUILTIN, LOW); // turn the LED off by making the voltage LOW
    vTaskDelay(200);
  }
}
//----------------------------------------
void processCommand(char *data)
{
  int len, xlen, ylen, zlen, alen;
  int tempDIO;
  String stemp;

  len = Uart.inputString.length();
  //---------------------------------------
  if (strstr(data, "VER") != NULL)
  {
    Serial.println(F("ESP32_20230811"));
  }
  if (strstr(data, "DS18B20_ON") != NULL)
  {
    flag.DS18B20Flag = 1;
    Serial.println(F("DS18B20_ON"));

  }
  if (strstr(data, "NODE_RED") != NULL)
  {
    flag.DS18B20Flag = 2;
    Serial.println(F("NODE_RED"));

  }
  if (strstr(data, "DS18B20_OFF") != NULL)
  {
    flag.DS18B20Flag = 0;
    Serial.println(F("DS18B20_OFF"));

  }
 
}
//--------------------------------------------
void callback(char* topic, byte* payload, unsigned int length)
{
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();

  // Switch on the LED if an 1 was received as first character
  if ((char)payload[0] == '1')
  {
    digitalWrite(LED_BUILTIN, LOW);   // Turn the LED on (Note that LOW is the voltage level
  }
  else
  {
    digitalWrite(LED_BUILTIN, HIGH);  // Turn the LED off by making the voltage HIGH
  }

}
//-----------------------------------------
//開始WiFi連線
void WifiConnect()
{
  //開始WiFi連線
  //WiFi.begin("Wokwi-GUEST", "", 6);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED)
  {
    delay(500);
    Serial.print(".");
  }
  Serial.println("WiFi連線成功");
  Serial.print("IP Address:");
  Serial.println(WiFi.localIP());
}
//----------------------------------------
void MQTTConnect()
{
  MQTTClient.setServer(MQTTServer, MQTTPort);
  MQTTClient.setCallback(MQTTCallback);
  while (!MQTTClient.connected())
  {
    //以亂數為ClietID
    String MQTTClientid = "esp32-" + String(random(1000000, 9999999));
    if (MQTTClient.connect(MQTTClientid.c_str(), MQTTUser, MQTTPassword))
    {
      //連結成功,顯示「已連線」。
      Serial.println("MQTT已連線");
      //訂閱SubTopic1主題
      MQTTClient.subscribe(MQTTSubTopic1);
    }
    else
    {
      //若連線不成功,則顯示錯誤訊息,並重新連線
      Serial.print("MQTT連線失敗,狀態碼=");
      Serial.println(MQTTClient.state());
      Serial.println("五秒後重新連線");
      delay(5000);
    }
  }
}
//------------------------------------------
//接收到訂閱時
void MQTTCallback(char* topic, byte* payload, unsigned int length)
{
  Serial.print(topic); Serial.print("訂閱通知:");
  String payloadString;//將接收的payload轉成字串
  //顯示訂閱內容
  for (int i = 0; i < length; i++)
  {
    payloadString = payloadString + (char)payload[i];
  }
  Serial.println(payloadString);
  //比對主題是否為訂閱主題1
  if (strcmp(topic, MQTTSubTopic1) == 0)
  {
    Serial.println("改變燈號:" + payloadString);
    if (payloadString == "ON")
    {
      digitalWrite(16, HIGH);
    }
    if (payloadString == "OFF")
    {
      digitalWrite(16, LOW);
    }
  }
}
//-------------------------------------------
void vNodeRedTask()
{
  //Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}
//-------------------------------------------
void vDS18B20Task()
{
  Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}

String httpGETRequest1(const char* serverName)
{
  HTTPClient http;
  //String AQI;
 
  // Your IP address with path or Domain name with URL path
  http.begin(serverName);
 
  // Send HTTP POST request
  int httpResponseCode = http.GET();
 
  String payload = "{}";
 
  //if (httpResponseCode > 0) {
  if (httpResponseCode == HTTP_CODE_OK) {         // 如果取得資料成功
    //Serial.print("HTTP Response code: ");
    //Serial.println(httpResponseCode);
    payload = http.getString();
    //JSON格式解析
    //payload = "[" + payload + "]"; //將資料轉為JSON 陣列格式
    DynamicJsonDocument AQJarray(payload.length() * 2);
    deserializeJson(AQJarray, payload);//解析payload為JSON Array格式
    String AQI = AQJarray["records"][0]["pm2.5_avg"];
    //AQI = AQJarray["records"][0]["aqi"];
    AQIBuffer = AQI;
    //Serial.println(Area + " AQI:" + AQI);
    //-------------------------
   
  }
  else {
    Serial.print("Error code: ");
    Serial.println(httpResponseCode);
  }
  // Free resources
  http.end();

  //return payload;  
  return AQIBuffer;
}

//-----------------------------------



沒有留言:

張貼留言