2023年8月29日 星期二

Stream Video - Raspberry Pi + USB Webcam

Purpose:

This project uses Raspberry Pi and usb webcam to do video stream. We need to create a MJPG Stream Server to be able to live stream with Raspberry Pi. This can be connected to multiple usb cameras and used as a monitoring system. 

Fundamental:

MJPG Streamer

mjpg-streamer is a command line application that copies JPEG frames from one or more input plugins to multiple output plugins. It can be used to stream JPEG files over an IP-based network from a webcam to various types of viewers such as Chrome, Firefox, Cambozola, VLC, mplayer, and other software capable of receiving MJPG streams.


Installing the mjpg-streamer on Raspberry Pi:

1. sudo apt-get update
2. sudo apt-get install libjpeg8-dev
3. sudo apt-get install imagemagick 
4. sudo apt-get install libv4l-dev
5. sudo apt-get install libjpeg-dev
6. sudo apt-get install cmake
7. Download wget https://github.com/jacksonliam/mjpg-streamer/archive/master.zip
8. unzip master.zip
9. cd mjpg-streamer
10. cd mjpg-streamer-experimental
11. make clean all
12. sudo make install 

Execute the mjpg-streamer on Raspberry Pi:
1. ls -al /dev/ |grep video  -- check if your webcam is supported on Raspberry Pi 
2. v4l2-ctl --list-devices      -- List devices for v4l2 interface
3. ./mjpg_streamer -i "/usr/local/lib/mjpg-streamer/input_uvc.so -y -d /dev/video0 -n -f 6 -r 640x480" -o "/usr/local/lib/mjpg-streamer/output_http.so -p 8084 -w /usr/local/share/mjpg-streamer/www"
4. ./mjpg_streamer -i "/usr/local/lib/mjpg-streamer/input_uvc.so -y -d /dev/video2 -n -f 6 -r 640x480" -o "/usr/local/lib/mjpg-streamer/output_http.so -p 8085 -w /usr/local/share/mjpg-streamer/www"

YouTube Demonstration:









2023年8月24日 星期四

Raspberry Pi self-hosted ftp server and SSH remote access

Purpose:

This Project 利用Raspberry Pi 建構一個自己的FTP server, 並利用cpolar穿網intranet penetration cloud service在任何有網路的地方用電腦或手機去存取想要的資料. 也可Access and control your Raspberry Pi remotely via SSH and AnyDesk.

This Project use Raspberry Pi to build your own FTP server, and use cpolar intranet penetration cloud service to access the desired data with your computer or mobile phone anywhere there is a network. You can also Access and control your Raspberry Pi remotely via SSH and AnyDesk.

Fundamental:

Raspberry Pi:

Raspberry Pi is a popular embedded system board that is relatively small in size and easy to use. Its applications range from hobbyist to professional work areas.
FTP:

FTP (File Transfer Protocol) FTP server is computer software consisting of one or more programs that can execute commands given by remote client(s) such as receiving, sending, deleting files, creating or removing directories, etc. The software may run as a software component of a program, as a standalone program or even as one or more processes (in the background).

FileZilla Client:
The FileZilla Client not only supports FTP, but also FTP over TLS (FTPS) and SFTP. It is open source software distributed free of charge under the terms of the GNU General Public License.

AnyDesk:


AnyDesk 是一套功能強大的免費遠端遙控軟體,操作方式與 TeamViewer 差不多,但是連線速度更快,可以輕鬆突破防火牆的限制,執行程式後會得到一組電腦的ID,只要有被遙控端的ID,連線後等待對方同意就可以開始遠瑞控制。
AnyDesk is a set of powerful free remote control software. The operation method is similar to TeamViewer, but the connection speed is faster, and it can easily break through the firewall restrictions. After executing the program, you will get a set of computer IDs. As long as you have the ID of the remote terminal , After connecting, wait for the other party to agree to start remote control.

coplar:

cpolar is a secure intranet penetration cloud service, which exposes the local server under the intranet to the public network through a secure tunnel. This enables public network users to access intranet services normally.

Intranet penetration, simply put, means that the data of the intranet can be obtained by the external network and can be mapped to the public network, so that the data of the intranet can be accessed on the public network.
Installing the Cpolar on Raspberry Pi:
1. curl -sL https://git.io/cpolar | sudo bash 
or 
2. sudo wget https://www.cpolar.com/static/downloads/cpolar-stable-linux-arm.zip
3. sudo unzip cpolar-stable-linux-arm.zip
4. ./cpolar version
5. https://dashboard.cpolar.com/signup to register
6. ./cpolar authtoken xxxxxxxxxxxxxxxxxx
7. ./cpolar ssh 22

SSH:

The Secure Shell Protocol (SSH) is a cryptographic network protocol for operating network services securely over an unsecured network. Its most notable applications are remote login and command-line execution.

PuTTY:

PuTTY is an SSH and telnet client, developed originally by Simon Tatham for the Windows platform. PuTTY is open source software that is available with source code and is developed and supported by a group of volunteers.



YouTubeDemo:




2023年8月23日 星期三

Raspberry Pi self-hosted MQTT environment for Internet of Thing

Purpose:

In this Project I will show how I have self-hosted an MQTT environment with the help of a Mosquitto broker running on a Raspberry Pi and let ESP32 microcontrollers publish sensor data and subscribe to control LED through the broker.  these data will be persisted in a Node-RED UI. (Based on the ESP32 MQTT Publish/Subscribe DS18B20 Temperature to Node-Red Video on my channel) and use Cpolar enables public network users to access intranet services normally.

Fundamental :

MQTT (MQ Telemetry Protocol) is a lightweight machine-to-machine communication protocol that works on top of TCP/IP and it is very much suited for usage in an Internet-of-Things scenario. MQTT uses the publish-subscribe pattern where the clients subscribe to topics and publish messages through a broker. There are many different framework implementations of MQTT as well as complete cloud services that use MQTT (Adafruit IO, CloudMQTT, ThingMQ etc). The cloud services are nice and easy to use, but if you don’t want your data uploaded to a third party storage, you can host an MQTT environment yourself and have full control of the gathered data. Even if you decide to upload data to the cloud, the local MQTT environment can work as a gateway where data goes out to the Internet only from one single place on the Intranet. This makes it easier to control and change the outgoing data flow.

Installing the Mosquitto broker on Raspberry Pi:
1. sudo apt-get update
2. sudo apt-get upgrade
3. sudo apt-get install mosquitto mosquitto-clients
To conduct a small communication test, start the subscriber and publisher client applications by opening two separate consoles in our Raspberry Pi and typing the following commands:
Console 1: mosquitto_sub -h localhost -t /test/topic
Console 2: mosquitto_pub -h localhost -t /test/topic -m “Hello”
Node-RED is a programming tool for wiring together hardware devices, APIs and online services in new and interesting way. It provides a browser-based editor that makes it easy to wire together flows using the wide range of nodes in the palette that can be deployed to its runtime in a single-click.

Installing the Node-RED on Raspberry Pi:
This command will install several things to our Raspberry Pi. These packages include Node.js, npm, and Node-RED itself.
1. bash <(curl -sL https://raw.githubusercontent.com/node-red/linux-installers/master/deb/update-nodejs-and-nodered)
2. Are you really sure you want to do this ? [y/N] ? Y
3. Would you like to install the Pi-specific nodes ? [y/N] ? Y

cpolar is a secure intranet penetration cloud service, which exposes the local server under the intranet to the public network through a secure tunnel. This enables public network users to access intranet services normally.
Intranet penetration, simply put, means that the data of the intranet can be obtained by the external network and can be mapped to the public network, so that the data of the intranet can be accessed on the public network.
Installing the Cpolar on Raspberry Pi:
1. curl -sL https://git.io/cpolar | sudo bash
2. ./cpolar version
3. https://dashboard.cpolar.com/signup to register
4. ./cpolar authtoken xxxxxxxxxxxxxxxxxx
5. ./cpolar mosquitto 1883

Raspberry Pi is a popular embedded system board that is relatively small in size and easy to use. Its applications range from hobbyist to professional work areas.

YouTube Demo:
Code Introduce:
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <OneWire.h>
#include <DallasTemperature.h>
#include <BluetoothSerial.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>
//----------------------------------------------------------------------
#define NTP_SERVER     "pool.ntp.org"
#define UTC_OFFSET     0
#define UTC_OFFSET_DST 0
//-----   DS18B20  ------------------
#define DQ_Pin 4
OneWire oneWire(DQ_Pin);
DallasTemperature sensors(&oneWire);

byte data[12]; // buffer for data
byte address[8]; // 64 bit device address
//float Humidity;
float Temperature;
// ------ 以下修改成你自己的WiFi帳號密碼 ------
char* ssid = "Winson_Y52";
char* password = "8888888888";
//----PM2.5--------
String AQIBuffer;
//申請API key 環保署: https://data.epa.gov.tw/api-term
//查看空氣品質列表:https://data.epa.gov.tw/api/v2/aqx_p_432?offset=0&format=json&api_key=你的APIkey
String APIkey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; //你的環保署網站 API Key
//String Area = "橋頭"; //希望取得空氣品質的地點
String Area = "新竹"; //希望取得空氣品質的地點
String url = "https://data.epa.gov.tw/api/v2/aqx_p_432?format=json&limit=5&api_key=" + APIkey + "&filters=SiteName,EQ," + Area ; //PM2.5的網址
// ------ 以下修改成你MQTT設定 ------
//char* MQTTServer = "mqtt.eclipseprojects.io";//免註冊MQTT伺服器
//char* MQTTServer = "test.mosquitto.org";//免註冊MQTT伺服器
char* MQTTServer = "1.tcp.cpolar.io";
int MQTTPort = 10521;//MQTT Port
char* MQTTUser = "";//
char* MQTTPassword = "";//
//推播主題1:推播溫度
char* MQTTPubTopic1 = "winsondiy/ESP32/temp";
//推播主題2:推播濕度
char* MQTTPubTopic2 = "winsondiy/ESP32/pm2.5_avg";
//訂閱主題1:改變LED燈號
char* MQTTSubTopic1 = "winsondiy/ESP32/led";
long MQTTLastPublishTime;//此變數用來記錄推播時間
long MQTTPublishInterval = 5000;//每5秒推撥一次

WiFiClient WifiClient;
PubSubClient MQTTClient(WifiClient);

unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
//--------- Flag structure --------------------------------------
typedef struct _vFlag
{
  uint8_t BTFlag = 0;
  uint8_t DC_Flag = 0;
  uint8_t CANFlag = 0;
  uint8_t I2C_Flag = 0;
  uint8_t RFIDWrite = 0;
  uint8_t RFIDRead = 0;
  uint8_t dht22 = 0;
  uint8_t sensor1_Flag = 0;
  uint8_t initial_Flag = 0;
  uint8_t FunctionFlag = 0;
  uint8_t DS18B20Flag = 0;
} vFlag;
vFlag *flag_Ptr;
vFlag flag;
//--------- uart structure --------------------------------------
//----------uart--------------
#define LINE_BUFFER_LENGTH 64
typedef struct _vUart
{
  char c;
  int lineIndex = 0;
  int line1Index = 0;
  int BTlineIndex = 0;
  bool lineIsComment;
  bool lineSemiColon;
  char line[128];
  char BTline[20];
  String inputString;
  String BTinputString;
  String S1inputString;
  int V[16];
  char ctemp[30];
  char I2C_Data[80];
  int DC_Spped = 50;
  float Voltage[16];
  int Buffer[128];
  int StartCnt = 0;
  int ReadCnt = 0;
  int sensorValue = 0;
} vUart;
vUart *Uart_Ptr;
vUart Uart;

//---------------------------------------------------------------------------------
#ifndef LED_BUILTIN
#define LED_BUILTIN 2
#endif
//----------------------------------------------------------------
TaskHandle_t hled;
TaskHandle_t huart;
//------------------------------------------------------------------------------
void initial()
{
  Serial.println(F("Create Task"));
  //----------------------------------------------------------------------
  xTaskCreatePinnedToCore(
    vUARTTask, "UARTTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 3 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &huart //handle
    ,
    0);

  //--------------- create task----------------------------------
  xTaskCreatePinnedToCore(
    vLEDTask, "LEDTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 2 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &hled //handle
    ,
    0);
  //----------------------------------------
  //----------------------------------------------------------------------
  //vTaskSuspend(hfunction); //暫停TASK運行
  //----------------------------------------------------------------------
}

void setup()
{
  Serial.begin(9600);
  Serial.println(F("init"));
  initial();
  pinMode(LED_BUILTIN, OUTPUT);
  pinMode(16, OUTPUT);
  //-------------------------
  //----------------------------
  Serial.println(ssid);
  //WiFi.mode(WIFI_STA);
  //WiFi.begin(ssid, password);
  //WiFi.begin("Wokwi-GUEST", "", 6);
  WifiConnect();
  //-----------------------------------------
  //MQTTClient.setServer(MQTTServer, MQTTPort);
  //MQTTClient.setCallback(MQTTCallback);
  //------------------------------------------
 
  //---------------------------------------------
  if (oneWire.search(address))
  {
    Serial.println("Slave device found!");
    Serial.print("Device Address = ");
    Serial.println(address[0]);
  }
  else
  {
    Serial.println("Slave device not found!");
  }
  //-----DS18B20-----------
  sensors.begin();
  //dht.begin();
}

void loop()
{
  Serial.print(F("Main at core:"));
  Serial.println(xPortGetCoreID());
  while(1)
  {
    if (WiFi.status() != WL_CONNECTED)
    {
      WifiConnect();
    }
   
    if (!MQTTClient.connected())
    {
      MQTTConnect();
    }
    //如果距離上次傳輸已經超過10秒,則Publish溫度
    if ((millis() - MQTTLastPublishTime) >= MQTTPublishInterval )
    {
      //---ds18b20-----
      sensors.requestTemperatures();
      Temperature=sensors.getTempCByIndex(0);
      Serial.println(sensors.getTempCByIndex(0));
      MQTTClient.publish(MQTTPubTopic1, String(Temperature).c_str());
      Serial.println("Temperature Publish to MQTT Broker");
      //---------------------------
     
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      MQTTClient.publish(MQTTPubTopic2, String(AQIBuffer).c_str());
      Serial.println("pm2.5_avg Publish to MQTT Broker");
     
      MQTTLastPublishTime = millis();
    }
    MQTTClient.loop();//update status
    delay(50);
    //----------------------------
    if(flag.DS18B20Flag == 1)
    {
      vDS18B20Task();
    }
    if(flag.DS18B20Flag == 2)
    {
      vNodeRedTask();
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      flag.DS18B20Flag =0;
    }
   
   
  }
}
//-------------------------------------------
void vUARTTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.print(F("UARTTask at core:"));
  Serial.println(xPortGetCoreID());
  vTaskDelay(100);
  for (;;)
  {
    while (Serial.available() > 0)
    {
      Uart.c = Serial.read();
 
      if ((Uart.c == '\n') || (Uart.c == '\r'))
      { // End of line reached
        if (Uart.lineIndex > 0)
        { // Line is complete. Then execute!
          Uart.line[Uart.lineIndex] = '\0'; // Terminate string
          //Serial.println( F("Debug") );
          //Serial.println( Uart.inputString );
          processCommand(Uart.line); // do something with the command
          Uart.lineIndex = 0;
          Uart.inputString = "";
        }
        else
        {
          // Empty or comment line. Skip block.
        }
        Uart.lineIsComment = false;
        Uart.lineSemiColon = false;
        Serial.println(F("ok>"));
      }
      else
      {
        //Serial.println( c );
        if ((Uart.lineIsComment) || (Uart.lineSemiColon))
        {
          if (Uart.c == ')')
            Uart.lineIsComment = false; // End of comment. Resume line.
        }
        else
        {
          if (Uart.c == '/')
          { // Block delete not supported. Ignore character.
          }
          else if (Uart.c == '~')
          { // Enable comments flag and ignore all characters until ')' or EOL.
            Uart.lineIsComment = true;
          }
          else if (Uart.c == ';')
          {
            Uart.lineSemiColon = true;
          }
          else if (Uart.lineIndex >= LINE_BUFFER_LENGTH - 1)
          {
            Serial.println("ERROR - lineBuffer overflow");
            Uart.lineIsComment = false;
            Uart.lineSemiColon = false;
          }
          else if (Uart.c >= 'a' && Uart.c <= 'z')
          { // Upcase lowercase
            Uart.line[Uart.lineIndex] = Uart.c - 'a' + 'A';
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += (char)(Uart.c - 'a' + 'A');
          }
          else
          {
            Uart.line[Uart.lineIndex] = Uart.c;
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += Uart.c;
          }
        }
      }
    } //while (Serial.available() > 0)
    vTaskDelay(5);
  }
}
//-------------------------------------------------------------------------
static void vLEDTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.println(F("LEDTask at core:"));
  Serial.println(xPortGetCoreID());
  pinMode(LED_BUILTIN, OUTPUT);
  for (;;) // A Task shall never return or exit.
  {
    digitalWrite(LED_BUILTIN, HIGH); // turn the LED on (HIGH is the voltage level)
    vTaskDelay(200);
    digitalWrite(LED_BUILTIN, LOW); // turn the LED off by making the voltage LOW
    vTaskDelay(200);
  }
}
//----------------------------------------
void processCommand(char *data)
{
  int len, xlen, ylen, zlen, alen;
  int tempDIO;
  String stemp;

  len = Uart.inputString.length();
  //---------------------------------------
  if (strstr(data, "VER") != NULL)
  {
    Serial.println(F("ESP32_20230811"));
  }
  if (strstr(data, "DS18B20_ON") != NULL)
  {
    flag.DS18B20Flag = 1;
    Serial.println(F("DS18B20_ON"));

  }
  if (strstr(data, "NODE_RED") != NULL)
  {
    flag.DS18B20Flag = 2;
    Serial.println(F("NODE_RED"));

  }
  if (strstr(data, "DS18B20_OFF") != NULL)
  {
    flag.DS18B20Flag = 0;
    Serial.println(F("DS18B20_OFF"));

  }
 
}
//--------------------------------------------
void callback(char* topic, byte* payload, unsigned int length)
{
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();

  // Switch on the LED if an 1 was received as first character
  if ((char)payload[0] == '1')
  {
    digitalWrite(LED_BUILTIN, LOW);   // Turn the LED on (Note that LOW is the voltage level
  }
  else
  {
    digitalWrite(LED_BUILTIN, HIGH);  // Turn the LED off by making the voltage HIGH
  }

}
//-----------------------------------------
//開始WiFi連線
void WifiConnect()
{
  //開始WiFi連線
  //WiFi.begin("Wokwi-GUEST", "", 6);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED)
  {
    delay(500);
    Serial.print(".");
  }
  Serial.println("WiFi連線成功");
  Serial.print("IP Address:");
  Serial.println(WiFi.localIP());
}
//----------------------------------------
void MQTTConnect()
{
  MQTTClient.setServer(MQTTServer, MQTTPort);
  MQTTClient.setCallback(MQTTCallback);
  while (!MQTTClient.connected())
  {
    //以亂數為ClietID
    String MQTTClientid = "esp32-" + String(random(1000000, 9999999));
    if (MQTTClient.connect(MQTTClientid.c_str(), MQTTUser, MQTTPassword))
    {
      //連結成功,顯示「已連線」。
      Serial.println("MQTT已連線");
      //訂閱SubTopic1主題
      MQTTClient.subscribe(MQTTSubTopic1);
    }
    else
    {
      //若連線不成功,則顯示錯誤訊息,並重新連線
      Serial.print("MQTT連線失敗,狀態碼=");
      Serial.println(MQTTClient.state());
      Serial.println("五秒後重新連線");
      delay(5000);
    }
  }
}
//------------------------------------------
//接收到訂閱時
void MQTTCallback(char* topic, byte* payload, unsigned int length)
{
  Serial.print(topic); Serial.print("訂閱通知:");
  String payloadString;//將接收的payload轉成字串
  //顯示訂閱內容
  for (int i = 0; i < length; i++)
  {
    payloadString = payloadString + (char)payload[i];
  }
  Serial.println(payloadString);
  //比對主題是否為訂閱主題1
  if (strcmp(topic, MQTTSubTopic1) == 0)
  {
    Serial.println("改變燈號:" + payloadString);
    if (payloadString == "ON")
    {
      digitalWrite(16, HIGH);
    }
    if (payloadString == "OFF")
    {
      digitalWrite(16, LOW);
    }
  }
}
//-------------------------------------------
void vNodeRedTask()
{
  //Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}
//-------------------------------------------
void vDS18B20Task()
{
  Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}

String httpGETRequest1(const char* serverName)
{
  HTTPClient http;
  //String AQI;
 
  // Your IP address with path or Domain name with URL path
  http.begin(serverName);
 
  // Send HTTP POST request
  int httpResponseCode = http.GET();
 
  String payload = "{}";
 
  //if (httpResponseCode > 0) {
  if (httpResponseCode == HTTP_CODE_OK) {         // 如果取得資料成功
    //Serial.print("HTTP Response code: ");
    //Serial.println(httpResponseCode);
    payload = http.getString();
    //JSON格式解析
    //payload = "[" + payload + "]"; //將資料轉為JSON 陣列格式
    DynamicJsonDocument AQJarray(payload.length() * 2);
    deserializeJson(AQJarray, payload);//解析payload為JSON Array格式
    String AQI = AQJarray["records"][0]["pm2.5_avg"];
    //AQI = AQJarray["records"][0]["aqi"];
    AQIBuffer = AQI;
    //Serial.println(Area + " AQI:" + AQI);
    //-------------------------
   
  }
  else {
    Serial.print("Error code: ");
    Serial.println(httpResponseCode);
  }
  // Free resources
  http.end();

  //return payload;  
  return AQIBuffer;
}

//-----------------------------------



2023年8月19日 星期六

ESP32 MQTT Publish/Subscribe DS18B20 Temperature to Node-Red

Purpose:
利用ESP32偵測DS18B20的溫度以及接台灣環保署的天氣資訊將其訊息整合並利用公用MQTT broker的發布與訂閱Switch ON/OFF 來做LED燈號切換.
This Project use ESP32 to detect the temperature of DS18B20 and receive the weather information from the Taiwan Environmental Protection Agency to integrate the information and use the public MQTT broker to publish and subscribe and Switch ON/OFF to switch the LED light.

Fundamental:
MQTT(Message Queuing Telemetry Transport)訊息佇列遙測傳輸
MQTT是一種基於「發布(Publish)∕訂閱(Subsribe)」機制的訊息傳輸協定 (MQTT is a Client Server publish/subscribe messaging transport protocol), 我們可以把它想成在公佈欄上張貼發行訂閱的機制. MQTT訊息發送端, 相當於把訊息交給公佈欄的代理人(broker), 來統籌管理發行和訂閱事宜. 每一個訊息來源都有個唯一的主題名稱.
MQTT (Message Queuing Telemetry Transport) message queue telemetry transmission
MQTT is a message transfer protocol based on the "Publish (Publish)/Subscribe (Subscribe)" mechanism (MQTT is a Client Server publish/subscribe messaging transport protocol), we can think of it as publishing and subscribing on the bulletin board Mechanism. The MQTT message sender is equivalent to handing the message to the broker (broker) of the bulletin board to coordinate and manage the distribution and subscription matters. Each message source has a unique topic name.
代理人是個伺服器軟體, 向伺服器發送主題的一方是發布者(publisher), 從伺服器獲取主題的一方則是訂閱者(subscriber).

MQTT的標頭採用數字編碼,整個長度只佔2位元組,等同兩個字元,後面跟著訊息的主題(topic)和內容(payload).
MQTT Explorer
測試有沒有成功連線到 MQTT Broker. 
我們先到 MQTT Explorer去下載工具,連結如下。
http://mqtt-explorer.com/
host 設定 mqtt.eclipseprojects.io 要跟ESP32 code 一樣
Node-RED
Node-RED is a programming tool for wiring together hardware devices, APIs and online services in new and interesting ways. It provides a browser-based editor that makes it easy to wire together flows using the wide range of nodes in the palette that can be deployed to its runtime in a single-click.
Node-RED 是 IBM 以 Node.js 為基礎, 開發出來的視覺化 IOT 開發工具, 透過「流程圖」方式的操作,透過 Node-RED 完成許多後端才能做的事情.
Node-RED is a visual IOT development tool developed by IBM based on Node.js. Through the operation of "flow chart", Node-RED can accomplish many things that can only be done in the back end.
YouTube Demo:
Code  Introduction:
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <OneWire.h>
#include <DallasTemperature.h>
#include <BluetoothSerial.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>

#define NTP_SERVER     "pool.ntp.org"
#define UTC_OFFSET     0
#define UTC_OFFSET_DST 0
//-----   DS18B20  ------------------
#define DQ_Pin 4
OneWire oneWire(DQ_Pin);
DallasTemperature sensors(&oneWire);

byte data[12]; // buffer for data
byte address[8]; // 64 bit device address
float Temperature;
// ------ 以下修改成你自己的WiFi帳號密碼 ------
char* ssid = "your ssid";
char* password = "your password";
//----PM2.5--------
String AQIBuffer;

//申請API key 環保署: https://data.epa.gov.tw/api-term
//查看空氣品質列表:https://data.epa.gov.tw/api/v2/aqx_p_432?offset=0&format=json&api_key=你的APIkey
String APIkey = "your API Key";
//String Area = "橋頭"; //希望取得空氣品質的地點
String Area = "新竹"; //希望取得空氣品質的地點
String url = "https://data.epa.gov.tw/api/v2/aqx_p_432?format=json&limit=5&api_key="
+ APIkey + "&filters=SiteName,EQ," + Area ; //PM2.5的網址
// ------ 以下修改成你MQTT設定 ------
char* MQTTServer = "mqtt.eclipseprojects.io";//免註冊MQTT伺服器
//char* MQTTServer = "test.mosquitto.org";//免註冊MQTT伺服器
int MQTTPort = 1883;//MQTT Port
char* MQTTUser = "";//
char* MQTTPassword = "";//
//推播主題1:推播溫度
char* MQTTPubTopic1 = "winsondiy/ESP32/temp";
//推播主題2:推播濕度
char* MQTTPubTopic2 = "winsondiy/ESP32/pm2.5_avg";
//訂閱主題1:改變LED燈號
char* MQTTSubTopic1 = "winsondiy/ESP32/led";
long MQTTLastPublishTime;//此變數用來記錄推播時間
long MQTTPublishInterval = 5000;//每10秒推撥一次

WiFiClient WifiClient;
PubSubClient MQTTClient(WifiClient);

unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
//--------- Flag structure --------------------------------------
typedef struct _vFlag
{
  uint8_t BTFlag = 0;
  uint8_t DC_Flag = 0;
  uint8_t CANFlag = 0;
  uint8_t I2C_Flag = 0;
  uint8_t RFIDWrite = 0;
  uint8_t RFIDRead = 0;
  uint8_t dht22 = 0;
  uint8_t sensor1_Flag = 0;
  uint8_t initial_Flag = 0;
  uint8_t FunctionFlag = 0;
  uint8_t DS18B20Flag = 0;
} vFlag;
vFlag *flag_Ptr;
vFlag flag;
//--------- uart structure --------------------------------------
//----------uart--------------
#define LINE_BUFFER_LENGTH 64
typedef struct _vUart
{
  char c;
  int lineIndex = 0;
  int line1Index = 0;
  int BTlineIndex = 0;
  bool lineIsComment;
  bool lineSemiColon;
  char line[128];
  char BTline[20];
  String inputString;
  String BTinputString;
  String S1inputString;
  int V[16];
  char ctemp[30];
  char I2C_Data[80];
  int DC_Spped = 50;
  float Voltage[16];
  int Buffer[128];
  int StartCnt = 0;
  int ReadCnt = 0;
  int sensorValue = 0;
} vUart;
vUart *Uart_Ptr;
vUart Uart;

//---------------------------------------------------------------------------------
#ifndef LED_BUILTIN
#define LED_BUILTIN 2
#endif
//----------------------------------------------------------------
TaskHandle_t hled;
TaskHandle_t huart;
//------------------------------------------------------------------------------
void initial()
{
  Serial.println(F("Create Task"));
  //----------------------------------------------------------------------
  xTaskCreatePinnedToCore(
    vUARTTask, "UARTTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 3 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &huart //handle
    ,
    0);

  //--------------- create task----------------------------------
  xTaskCreatePinnedToCore(
    vLEDTask, "LEDTask" // A name just for humans
    ,
    1024 // This stack size can be checked & adjusted by reading the Stack Highwater
    ,
    NULL, 2 // Priority, with 3 (configMAX_PRIORITIES - 1) being the highest, and 0 being the lowest.
    ,
    &hled //handle
    ,
    0);
  //----------------------------------------
  //----------------------------------------------------------------------
  //vTaskSuspend(hfunction); //暫停TASK運行
  //----------------------------------------------------------------------
}

void setup()
{
  Serial.begin(9600);
  Serial.println(F("init"));
  initial();
  pinMode(LED_BUILTIN, OUTPUT);
  pinMode(16, OUTPUT);
  //-------------------------
  //----------------------------
  Serial.println(ssid);

  WifiConnect();
  //-----------------------------------------

  //------------------------------------------
  configTime(8*3600, 0, "pool.ntp.org","time.nist.gov"); // enable NTP for Taipei time
  //configTime(UTC_OFFSET, UTC_OFFSET_DST, NTP_SERVER);
  //---------------------------------------------
  if (oneWire.search(address))
  {
    Serial.println("Slave device found!");
    Serial.print("Device Address = ");
    Serial.println(address[0]);
  }
  else
  {
    Serial.println("Slave device not found!");
  }
  //-----DS18B20-----------
  sensors.begin();

}

void loop()
{
  Serial.print(F("Main at core:"));
  Serial.println(xPortGetCoreID());
  while(1)
  {
    if (WiFi.status() != WL_CONNECTED)
    {
      WifiConnect();
    }
   
    if (!MQTTClient.connected())
    {
      MQTTConnect();
    }

    if ((millis() - MQTTLastPublishTime) >= MQTTPublishInterval )
    {
      //---ds18b20-----
      sensors.requestTemperatures();
      Temperature=sensors.getTempCByIndex(0);
      Serial.println(sensors.getTempCByIndex(0));
      MQTTClient.publish(MQTTPubTopic1, String(Temperature).c_str());
      Serial.println("Temperature Publish to MQTT Broker");
      //---------------------------
     
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      MQTTClient.publish(MQTTPubTopic2, String(AQIBuffer).c_str());
      Serial.println("pm2.5_avg Publish to MQTT Broker");
     
      MQTTLastPublishTime = millis();
    }
    MQTTClient.loop();//update status
    delay(50);
    //----------------------------
    if(flag.DS18B20Flag == 2)
    {
      vNodeRedTask();
      AQIBuffer = httpGETRequest1(url.c_str());
      Serial.print("新竹 pm2.5_avg: ");
      Serial.println(AQIBuffer);
      flag.DS18B20Flag =0;
    }
   
   
  }
}
//-------------------------------------------
void vUARTTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.print(F("UARTTask at core:"));
  Serial.println(xPortGetCoreID());
  vTaskDelay(100);
  for (;;)
  {
    while (Serial.available() > 0)
    {
      Uart.c = Serial.read();
 
      if ((Uart.c == '\n') || (Uart.c == '\r'))
      { // End of line reached
        if (Uart.lineIndex > 0)
        { // Line is complete. Then execute!
          Uart.line[Uart.lineIndex] = '\0'; // Terminate string
          //Serial.println( F("Debug") );
          //Serial.println( Uart.inputString );
          processCommand(Uart.line); // do something with the command
          Uart.lineIndex = 0;
          Uart.inputString = "";
        }
        else
        {
          // Empty or comment line. Skip block.
        }
        Uart.lineIsComment = false;
        Uart.lineSemiColon = false;
        Serial.println(F("ok>"));
      }
      else
      {
        //Serial.println( c );
        if ((Uart.lineIsComment) || (Uart.lineSemiColon))
        {
          if (Uart.c == ')')
            Uart.lineIsComment = false; // End of comment. Resume line.
        }
        else
        {
          if (Uart.c == '/')
          { // Block delete not supported. Ignore character.
          }
          else if (Uart.c == '~')
          { // Enable comments flag and ignore all characters until ')' or EOL.
            Uart.lineIsComment = true;
          }
          else if (Uart.c == ';')
          {
            Uart.lineSemiColon = true;
          }
          else if (Uart.lineIndex >= LINE_BUFFER_LENGTH - 1)
          {
            Serial.println("ERROR - lineBuffer overflow");
            Uart.lineIsComment = false;
            Uart.lineSemiColon = false;
          }
          else if (Uart.c >= 'a' && Uart.c <= 'z')
          { // Upcase lowercase
            Uart.line[Uart.lineIndex] = Uart.c - 'a' + 'A';
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += (char)(Uart.c - 'a' + 'A');
          }
          else
          {
            Uart.line[Uart.lineIndex] = Uart.c;
            Uart.lineIndex = Uart.lineIndex + 1;
            Uart.inputString += Uart.c;
          }
        }
      }
    } //while (Serial.available() > 0)
    vTaskDelay(5);
  }
}
//-------------------------------------------------------------------------
static void vLEDTask(void *pvParameters)
{
  (void)pvParameters;

  Serial.println(F("LEDTask at core:"));
  Serial.println(xPortGetCoreID());
  pinMode(LED_BUILTIN, OUTPUT);
  for (;;) // A Task shall never return or exit.
  {
    digitalWrite(LED_BUILTIN, HIGH); // turn the LED on (HIGH is the voltage level)
    vTaskDelay(200);
    digitalWrite(LED_BUILTIN, LOW); // turn the LED off by making the voltage LOW
    vTaskDelay(200);
  }
}
//----------------------------------------
void processCommand(char *data)
{
  int len, xlen, ylen, zlen, alen;
  int tempDIO;
  String stemp;

  len = Uart.inputString.length();
  //---------------------------------------
  if (strstr(data, "VER") != NULL)
  {
    Serial.println(F("ESP32_20230811"));
  }
  if (strstr(data, "DS18B20_ON") != NULL)
  {
    flag.DS18B20Flag = 1;
    Serial.println(F("DS18B20_ON"));

  }
  if (strstr(data, "NODE_RED") != NULL)
  {
    flag.DS18B20Flag = 2;
    Serial.println(F("NODE_RED"));

  }
  if (strstr(data, "DS18B20_OFF") != NULL)
  {
    flag.DS18B20Flag = 0;
    Serial.println(F("DS18B20_OFF"));

  }
 
}
//-----------------------------------------

//--------------------------------------------
void WifiConnect()
{

  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED)
  {
    delay(500);
    Serial.print(".");
    spinner();
  }
  Serial.println("WiFi連線成功");
  Serial.print("IP Address:");
  Serial.println(WiFi.localIP());
}
//----------------------------------------
void MQTTConnect()
{
  MQTTClient.setServer(MQTTServer, MQTTPort);
  MQTTClient.setCallback(MQTTCallback);
  while (!MQTTClient.connected())
  {
    //以亂數為ClietID
    String MQTTClientid = "esp32-" + String(random(1000000, 9999999));
    if (MQTTClient.connect(MQTTClientid.c_str(), MQTTUser, MQTTPassword))
    {
      Serial.println("MQTT已連線");
      MQTTClient.subscribe(MQTTSubTopic1);
    }
    else
    {
      Serial.print("MQTT連線失敗,狀態碼=");
      Serial.println(MQTTClient.state());
      Serial.println("五秒後重新連線");
      delay(5000);
    }
  }
}
//------------------------------------------
void MQTTCallback(char* topic, byte* payload, unsigned int length)
{
  Serial.print(topic); Serial.print("訂閱通知:");
  String payloadString;
  for (int i = 0; i < length; i++)
  {
    payloadString = payloadString + (char)payload[i];
  }
  Serial.println(payloadString);
  if (strcmp(topic, MQTTSubTopic1) == 0)
  {
    Serial.println("改變燈號:" + payloadString);
    if (payloadString == "ON")
    {
      digitalWrite(16, HIGH);
    }
    if (payloadString == "OFF")
    {
      digitalWrite(16, LOW);
    }
  }
}
//-------------------------------------------
void vNodeRedTask()
{
  //Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}
//-------------------------------------------
void vDS18B20Task()
{
  Serial.print("Temperatures --> ");
  sensors.requestTemperatures();
  Serial.println(sensors.getTempCByIndex(0));
}

String httpGETRequest1(const char* serverName)
{
  HTTPClient http;
  //String AQI;
 
  // Your IP address with path or Domain name with URL path
  http.begin(serverName);
 
  // Send HTTP POST request
  int httpResponseCode = http.GET();
 
  String payload = "{}";
 
  //if (httpResponseCode > 0) {
  if (httpResponseCode == HTTP_CODE_OK) {        

    payload = http.getString();

    DynamicJsonDocument AQJarray(payload.length() * 2);
    deserializeJson(AQJarray, payload);
    String AQI = AQJarray["records"][0]["pm2.5_avg"];
    AQIBuffer = AQI;
    //-------------------------
   
  }
  else {
    Serial.print("Error code: ");
    Serial.println(httpResponseCode);
  }
  http.end();

  return AQIBuffer;
}
//-----------------------------------