You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

707 lines
21 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"database/sql"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"go_mqtt/models"
_ "go_mqtt/models"
"go_mqtt/mydb"
_ "go_mqtt/mydb"
"io/ioutil"
"log"
"os"
"strings"
"time"
)
var ExitFlag bool = false
func getMySqlDB() (*sql.DB, error) {
dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local"
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal(err)
return nil, err
}
db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期
db.SetMaxOpenConns(50) // 最大连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
//defer mydb.Close()
err = db.Ping() // 检查连接是否成功建立。如果失败,则返回错误。
if err != nil {
// 处理连接失败的情况。例如,打印错误信息并退出程序。
log.Println("47.242.184.139:3306连接失败")
fmt.Println("Connection failed:", err)
return nil, err
} else {
log.Println("47.242.184.139:3306连接成功")
fmt.Println("Connected successfully")
//复制代码到你的Go文件中并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。
//然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。
//如果遇到任何问题检查你的DSN是否正确以及MySQL服务器是否正在运行并允许连接。
//确保你的防火墙设置允许你的应用程序访问MySQL端口默认是3306
//如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。
return db, nil
}
}
func moreDBConnect() {
db1, err := mydb.NewDatabase("appserver", "root", "Skyinno251,", "47.242.184.139", 3306)
if err != nil {
fmt.Println("Error connecting to db1:", err)
return
}
defer db1.Db.Close()
fmt.Println("Connected to databases:", db1.Name)
//db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306)
//if err != nil {
// fmt.Println("Error connecting to db2:", err)
// return
//}
//defer db2.Conn.Close()
//
//fmt.Println("Connected to databases:", db1.Name, "and", db2.Name)
}
func queryOneDataByMySql(db *sql.DB) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
var temperature models.Temperature
err := db.QueryRow("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?", 1).Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
if err != nil {
fmt.Println("QueryRow failed:", err)
return
}
fmt.Printf("Temperature Id:%d\n", temperature.Id)
fmt.Printf("Temperature: %+v\n", temperature)
}
func queryMoreDataByMySql(db *sql.DB) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
rows, err := db.Query("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?", 0, 30)
if err != nil {
fmt.Println("Query failed:", err)
return
}
defer rows.Close() // 确保在使用完毕后关闭资源
items := make([]models.Temperature, 0)
for rows.Next() {
var temperature models.Temperature
if err := rows.Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic); err != nil {
fmt.Println("Scan failed:", err)
return
}
items = append(items, temperature)
fmt.Println(items)
}
}
// 插入单条数据
func insertDataByMySql(db *sql.DB, temperature *models.Temperature) {
//插入操作:
stmt, err := db.Prepare("INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)")
if err != nil {
fmt.Println("Prepare failed:", err)
return
}
res, err := stmt.Exec(&temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
if err != nil {
fmt.Println("Exec failed:", err)
return
}
lastID, _ := res.LastInsertId()
fmt.Printf("Inserted Temperature with ID: %d\n", lastID)
}
// 事务处理:保证数据一致性
func insertMoreDataByMySql(db *sql.DB, temperature *models.Temperature) {
tx, err := db.Begin()
if err != nil {
fmt.Println("Begin transaction failed:", err)
return
}
_, err = tx.Exec("UPDATE users SET age = ? WHERE id = ?", 35, 1)
if err != nil {
tx.Rollback() // 如果发生错误,回滚事务
fmt.Println("Exec failed:", err)
return
}
err = tx.Commit() // 提交事务
if err != nil {
fmt.Println("Commit failed:", err)
return
}
fmt.Println("Transaction committed successfully")
}
func dbTest() {
// 初始化数据库连接
DB, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test")
if err != nil {
fmt.Println("Error opening DB:", err)
return
}
defer DB.Close()
DB.SetConnMaxLifetime(time.Minute * 5)
DB.SetMaxOpenConns(50)
DB.SetMaxIdleConns(10)
// 测试数据库连接
if err := DB.Ping(); err != nil {
fmt.Println("Connection failed:", err)
return
}
fmt.Println("Connected successfully")
// 查询数据
rows, _ := DB.Query("SELECT id, name FROM users")
defer rows.Close()
for rows.Next() {
var id int
var name string
rows.Scan(&id, &name)
fmt.Printf("ID: %d, Name: %s\n", id, name)
}
// 插入数据
stmt, _ := DB.Prepare("INSERT INTO users(name, age) VALUES(?, ?)")
stmt.Exec("Jane Doe", 28)
// 删除数据
stmt, _ = DB.Prepare("DELETE FROM users WHERE id = ?")
stmt.Exec(1)
}
// GenRsaKey generates an PKCS#1 RSA keypair of the given bit size in PEM format.
func GenRsaKey(bits int) (prvkey, pubkey []byte, err error) {
// Generates private key.
privateKey, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return
}
derStream := x509.MarshalPKCS1PrivateKey(privateKey)
block := &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: derStream,
}
prvkey = pem.EncodeToMemory(block)
// Generates public key from private key.
publicKey := &privateKey.PublicKey
derPkix, err := x509.MarshalPKIXPublicKey(publicKey)
if err != nil {
return
}
block = &pem.Block{
Type: "RSA PUBLIC KEY",
Bytes: derPkix,
}
pubkey = pem.EncodeToMemory(block)
return
}
func ParseRSAPrivateKeyFromPEM(pemData string) (*rsa.PrivateKey, error) {
var keyData = []byte(pemData)
// 解析PEM块
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("private key error not block in file")
}
// 解析RSA私钥
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return privateKey, nil
}
func parsePrivateKey(privateKeyFile string) (*rsa.PrivateKey, error) {
// 读取私钥文件
keyData, err := ioutil.ReadFile(privateKeyFile)
if err != nil {
return nil, err
}
// 解析PEM块
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("private key error not block in file")
}
// 解析RSA私钥
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return privateKey, nil
}
// 生成RSA私钥和公钥保存到文件中
func GenerateRSAKey(bits int) {
//GenerateKey函数使用随机数据生成器random生成一对具有指定字位数的RSA密钥
//Reader是一个全局、共享的密码用强随机数生成器
privateKey, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return
}
//保存私钥
//通过x509标准将得到的ras私钥序列化为ASN.1 的 DER编码字符串
// X509PrivateKey := x509.MarshalPKCS1PrivateKey(privateKey) // PKCS1 和 9 是不一致的
X509PrivateKey, err := x509.MarshalPKCS8PrivateKey(privateKey)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
//使用pem格式对x509输出的内容进行编码
//创建文件保存私钥
privateFile, err := os.Create("private.pem")
if err != nil {
return
}
//构建一个pem.Block结构体对象
privateBlock := pem.Block{Type: "PRIVATE KEY", Bytes: X509PrivateKey}
//将数据保存到文件
pem.Encode(privateFile, &privateBlock)
//保存公钥
//获取公钥的数据
publicKey := privateKey.PublicKey
//X509对公钥编码
X509PublicKey, err := x509.MarshalPKIXPublicKey(&publicKey)
if err != nil {
return
}
//pem格式编码
//创建用于保存公钥的文件
publicFile, err := os.Create("public.pem")
if err != nil {
return
}
//创建一个pem.Block结构体对象
publicBlock := pem.Block{Type: "Public Key", Bytes: X509PublicKey}
//保存到文件
pem.Encode(publicFile, &publicBlock)
}
// RSA_Decrypts RSA解密支持分段解密
func RSA_Decrypts(cipherText []byte, path string) []byte {
//打开文件
var bytesDecrypt []byte
//file, err := os.Open(path)
//if err != nil {
// fmt.Println(err)
//}
//
////获取文件内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取私钥文件
keyData, err := ioutil.ReadFile(path)
if err != nil {
return nil
}
//pem解码
block, _ := pem.Decode(keyData)
//X509解码
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
p := privateKey.(*rsa.PrivateKey)
keySize := p.Size()
srcSize := len(cipherText)
log.Println("密钥长度", keySize, "密文长度", srcSize)
var offSet = 0
var buffer = bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex])
if err != nil {
return nil
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesDecrypt = buffer.Bytes()
return bytesDecrypt
}
// RSA_DecryptsOne RSA解密支持分段解密
func RSA_DecryptsOne(cipherText []byte, privateKeyStr string) []byte {
//打开文件
var bytesDecrypt []byte
// 读取私钥文件转成字节数组
keyData := []byte(privateKeyStr)
//pem解码
block, _ := pem.Decode(keyData)
//X509解码
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
p := privateKey.(*rsa.PrivateKey)
keySize := p.Size()
srcSize := len(cipherText)
log.Println("密钥长度", keySize, "密文长度", srcSize)
var offSet = 0
var buffer = bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex])
if err != nil {
return nil
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesDecrypt = buffer.Bytes()
return bytesDecrypt
}
// RsaEncryptBlock 公钥加密-分段
func RsaEncryptBlock(src []byte, path string) (bytesEncrypt []byte, err error) {
//打开文件
//file, err := os.Open(path)
//if err != nil {
// return
//}
//
////读取文件的内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取公钥文件
keyData, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
//pem解码
block, _ := pem.Decode(keyData)
//x509解码
publicKeyInterface, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
fmt.Println(err)
}
//类型断言
publicKey := publicKeyInterface.(*rsa.PublicKey)
keySize, srcSize := publicKey.Size(), len(src)
log.Println("密钥长度", keySize, "明文长度", srcSize)
offSet, once := 0, keySize-11
buffer := bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + once
if endIndex > srcSize {
endIndex = srcSize
}
// 加密一部分
bytesOnce, err := rsa.EncryptPKCS1v15(rand.Reader, publicKey, src[offSet:endIndex])
if err != nil {
return nil, err
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesEncrypt = buffer.Bytes()
return
}
func timing(client mqtt.Client) {
//定时器10秒钟执行一次
ticker := time.NewTicker(10 * time.Second)
for {
time := <-ticker.C
fmt.Println("定时器====>", time.String())
if !client.IsConnectionOpen() {
client.Connect()
} else {
fmt.Println("client has connect")
//moreDBConnect()
//db, err := getMySqlDB()
//if err == nil {
// queryOneDataByMySql(db)
// //mydb.Close()
//}
//GenerateRSAKey(2048)
//publicPath := "public.pem"
//privatePath := "private.pem"
//
//var a = []byte("jack")
//encrptTxt, err := RsaEncryptBlock(a, publicPath)
//if err != nil {
// fmt.Println(err.Error())
//}
//fmt.Println("加密后的字符串:")
//encodeString := base64.StdEncoding.EncodeToString(encrptTxt)
//fmt.Println(encodeString)
//fmt.Println("-------------------")
//decodeByte, err := base64.StdEncoding.DecodeString(encodeString)
//if err != nil {
// fmt.Println(err.Error())
//}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
//fmt.Println("解密后的字符串:")
//fmt.Println(string(decrptCode))
}
}
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
message := fmt.Sprintf("%s", msg.Payload())
topic := fmt.Sprintf("%s", msg.Topic())
fmt.Println(message)
fmt.Println(topic)
//privatePath := "/Users/edao/GolandProjects/go_mqtt/privateKey.pem"
privateKeyPEM := `-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCYwvInDUICXbmWcOAR5hm86mz7WlKL3dDf12MSeIY2jm5QpUSSoMhrJOWLbOt5fXQepPaNO0M30A+3C4SFZrM/9WA5ehazO1u1m1LAkYzoSDRH3MMJsrJC3lCGDeOROteu+safuP9k/npU1YQu/+Ll2xEJNxyvUx4jLGM4LamiMI6ytM3gdnOAGP4YRQo9Etwo6I986yg/seCQC5rza9M4iBamoin7U8h9yOMKrM6xK/k9CcY/vn5+Uhe3Pvk4qj/2Ff3OXkkc1wfdILqdLwLOKL0Tb3ZciwG0p1CKO80yf5hyYoWjqZk5Rcd07nTo2gqTFGfLl9sqI9/+ipMDtnHlAgMBAAECggEATJd5yCC6lusdMRO5FOBUyUaUi9X2i1AU+RZKAynQySvSnbavUgExW58tRCHBUrGW9gJp59ft1N8J8hHhSO18NDY4H7laBlVdnwmYjRqtFo2VQO6sD4G8JRDION5f2iIxn/b2fYDI9H8vILfJRbNgtTSILyGlzTYUZzhLKxCh+8IsN96Nic8wa5COd1vZZmdhf2y8TG8clFWmozaScNSAATx7y+8XLVWjjWiIRZ6xQvx0uQPUParc9KihXXTKR2pA22yPIdz+U4MGD4kC0eczlcFKZ/dYv9e7OIGgnJfT0idSCu7nYb1pxJ1LxD9fS6IScNTF5dSe0OIL98e+XdyoAQKBgQDRep+5cW4iAKrEMH+djmcXAkoMiYtNVtnu0efLE8dP6vjYytQi368X9SdcASbfrQ31eEZmr/xQnlUF8oyHGkI38YS8dpAHzQcrkP3BljbbzB/3gJZaUdghGsDrK0xAJIzzmFKQpeKnGtr23vxUgaGrNsCYvQ0eQ7+5056KXS4r5QKBgQC6r8xtRSaje6L4WIydjWvYywsmRO0Of0aJLMDA/Wt2MWhHfh7ba9oI1cKGN80ap7xB2a9lQLgpv+C53wNtE5SpvjxsikAj96nUMMhGy9ojXrUith6HQhiINETz6Shnznd+AyrXP6KI/RpfA5nkDB5nrJxODwtYLP467IL7Cv7OAQKBgQCl4KxKdH/5fP28jYsAgJsxpSZt9xzQCU5Zxu396ZOSvUaApVyGoQpNtluMh3z48lhzYOKevgzW6gn5w69z7F8zXZT2iAxVoQ1kelP2z7RxKJrHqpNkwhqbXEwX7RlcUZUr8BqxYCqymJl7k+fMIzqaEalBSbLxnEReKi0I8/Bz4QKBgHK4b0ZCtVDHPEmimJ6E9l4dv/c/afF7swu+zaCK2ouiJvOwBCRQbYb6XPR/u/GCXASXUdpF4CX/vIhcDE3uN2/r8FO+zVWM7vbvF1OyF5WesG7pPW9e5ZZlkG3WvLa1wOZV6fCmMSo/ZwI2Q05JSDHrd43cXttLotrw1jiQ9C4BAoGBAKi4SOoOVQ5J5HQCDkBwPbG1AOLHFinzfoDl26GF/8Hy7fmmd1JiRTFldQp/A9VTAABz3sVYmMB92HSIaJhuDMoYJNI2Cf/cZifsv7vUL8cbLn+lPsKsebiuB0m0g4P2qLwLfegfNGEgA7lA5HIz3SELqbdp3iuqJeQl1fsJqD74
-----END PRIVATE KEY-----`
if topic == "app_push" {
fmt.Println("珠海电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----user--------")
var user models.User
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUser(&user) {
models.UpdateUser(&user)
} else {
models.SaveUser(&user)
}
} else if topic == "app_push_dyw" {
fmt.Println("大亚湾电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----userdyw--------")
var user models.Userdyw
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUserdyw(&user) {
models.UpdateUserdyw(&user)
} else {
models.SaveUserdyw(&user)
}
} else if topic == "app_push_yf" {
fmt.Println("云浮电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----userdyw--------")
var user models.Useryf
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUseryf(&user) {
models.UpdateUseryf(&user)
} else {
models.SaveUseryf(&user)
}
} else {
result := strings.Split(message, " ")
fmt.Println(result[0])
fmt.Println(result[1])
now := time.Now()
fmt.Println("---------now--------")
fmt.Println(now)
temperature := new(models.Temperature)
temperature.CreateDate = now
temperature.DataDate = now.Format("2006-01-02")
temperature.DataHour = now.Format("2006-01-02 15")
temperature.DataMinute = now.Format("2006-01-02 15:04")
temperature.Humidity = result[0]
temperature.Temperature = result[1]
temperature.Topic = topic
if topic == "WifiSHT/7C87CE9CA4E6/SHT20" {
temperature.LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501"
}
if topic == "WifiSHT/7C87CE9F5CBF/SHT20" {
temperature.LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房"
}
if topic == "WifiSHT/4CEBD686B6AA/SHT20" {
temperature.LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号"
}
fmt.Println("测温传感器地点:", temperature.LocationDesc)
models.SaveTemperature(temperature)
}
//uuid := uuid.New()
//models.SaveProduct()
//models.SaveUser(uuid.String())
//models.SaveOrder()
//var users []models.User
//var orders []models.Order
//var user models.User
//mydb.DB.Where("name = ?", "wenfei").First(&user)
//fmt.Println(user)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录
//fmt.Println("users:", users)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders)
//fmt.Println("orders:", orders)
// 原生 SQL 查询
//var products []models.Product
//mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息
//fmt.Println("Products:", products)
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
subTemperature(client)
subAppPush(client)
subAppPushDyw(client)
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func sub(client mqtt.Client) {
topic := "gomqtt01"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subTemperature(client mqtt.Client) {
topic := "WifiSHT/+/SHT20"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subAppPush(client mqtt.Client) {
topic := "app_push"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subAppPushDyw(client mqtt.Client) {
topic := "app_push_dyw"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subAppPushYf(client mqtt.Client) {
topic := "app_push_yf"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func publish(client mqtt.Client) {
num := 999999000000000000
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("gomqtt01", 0, false, text)
token.Wait()
fmt.Println(i)
time.Sleep(6 * time.Second)
}
}
func main1() {
uuid := uuid.New()
fmt.Println("Generated UUID:", uuid)
var broker = "47.242.184.139"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID(uuid.String())
opts.SetUsername("admin")
opts.SetPassword("publish452131wW452131wW$")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
//client.IsConnectionOpen()
go timing(client)
//sub(client)
subTemperature(client)
subAppPush(client)
subAppPushDyw(client)
subAppPushYf(client)
//publish(client)
for true {
time.Sleep(5 * time.Second)
}
//client.Disconnect(1000)
}