|
|
package main
|
|
|
|
|
|
import (
|
|
|
"database/sql"
|
|
|
"fmt"
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
_ "github.com/go-sql-driver/mysql"
|
|
|
"github.com/google/uuid"
|
|
|
"go_mqtt/models"
|
|
|
_ "go_mqtt/models"
|
|
|
"go_mqtt/mydb"
|
|
|
_ "go_mqtt/mydb"
|
|
|
"log"
|
|
|
"strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
func getMySqlDB() (*sql.DB, error) {
|
|
|
dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local"
|
|
|
db, err := sql.Open("mysql", dsn)
|
|
|
if err != nil {
|
|
|
log.Fatal(err)
|
|
|
return nil, err
|
|
|
}
|
|
|
db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期
|
|
|
db.SetMaxOpenConns(50) // 最大连接数
|
|
|
db.SetMaxIdleConns(10) // 最大空闲连接数
|
|
|
|
|
|
//defer mydb.Close()
|
|
|
|
|
|
err = db.Ping() // 检查连接是否成功建立。如果失败,则返回错误。
|
|
|
if err != nil {
|
|
|
// 处理连接失败的情况。例如,打印错误信息并退出程序。
|
|
|
log.Println("47.242.184.139:3306连接失败")
|
|
|
fmt.Println("Connection failed:", err)
|
|
|
return nil, err
|
|
|
} else {
|
|
|
log.Println("47.242.184.139:3306连接成功")
|
|
|
fmt.Println("Connected successfully")
|
|
|
//复制代码到你的Go文件中,并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。
|
|
|
//然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。
|
|
|
//如果遇到任何问题,检查你的DSN是否正确,以及MySQL服务器是否正在运行并允许连接。
|
|
|
//确保你的防火墙设置允许你的应用程序访问MySQL端口(默认是3306)。
|
|
|
//如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。
|
|
|
return db, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func moreDBConnect() {
|
|
|
db1, err := mydb.NewDatabase("appserver", "root", "Skyinno251,", "47.242.184.139", 3306)
|
|
|
if err != nil {
|
|
|
fmt.Println("Error connecting to db1:", err)
|
|
|
return
|
|
|
}
|
|
|
defer db1.Db.Close()
|
|
|
|
|
|
fmt.Println("Connected to databases:", db1.Name)
|
|
|
|
|
|
//db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306)
|
|
|
//if err != nil {
|
|
|
// fmt.Println("Error connecting to db2:", err)
|
|
|
// return
|
|
|
//}
|
|
|
//defer db2.Conn.Close()
|
|
|
//
|
|
|
//fmt.Println("Connected to databases:", db1.Name, "and", db2.Name)
|
|
|
}
|
|
|
|
|
|
func queryOneDataByMySql(db *sql.DB) {
|
|
|
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
|
|
|
var temperature models.Temperature
|
|
|
err := db.QueryRow("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?", 1).Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
|
|
|
if err != nil {
|
|
|
fmt.Println("QueryRow failed:", err)
|
|
|
return
|
|
|
}
|
|
|
fmt.Printf("Temperature Id:%d\n", temperature.Id)
|
|
|
fmt.Printf("Temperature: %+v\n", temperature)
|
|
|
}
|
|
|
|
|
|
func queryMoreDataByMySql(db *sql.DB) {
|
|
|
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
|
|
|
rows, err := db.Query("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?", 0, 30)
|
|
|
if err != nil {
|
|
|
fmt.Println("Query failed:", err)
|
|
|
return
|
|
|
}
|
|
|
defer rows.Close() // 确保在使用完毕后关闭资源
|
|
|
|
|
|
items := make([]models.Temperature, 0)
|
|
|
for rows.Next() {
|
|
|
var temperature models.Temperature
|
|
|
if err := rows.Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic); err != nil {
|
|
|
fmt.Println("Scan failed:", err)
|
|
|
return
|
|
|
}
|
|
|
items = append(items, temperature)
|
|
|
fmt.Println(items)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 插入单条数据
|
|
|
func insertDataByMySql(db *sql.DB, temperature *models.Temperature) {
|
|
|
//插入操作:
|
|
|
stmt, err := db.Prepare("INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)")
|
|
|
if err != nil {
|
|
|
fmt.Println("Prepare failed:", err)
|
|
|
return
|
|
|
}
|
|
|
res, err := stmt.Exec(&temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
|
|
|
if err != nil {
|
|
|
fmt.Println("Exec failed:", err)
|
|
|
return
|
|
|
}
|
|
|
lastID, _ := res.LastInsertId()
|
|
|
fmt.Printf("Inserted Temperature with ID: %d\n", lastID)
|
|
|
}
|
|
|
|
|
|
// 事务处理:保证数据一致性
|
|
|
func insertMoreDataByMySql(db *sql.DB, temperature *models.Temperature) {
|
|
|
tx, err := db.Begin()
|
|
|
if err != nil {
|
|
|
fmt.Println("Begin transaction failed:", err)
|
|
|
return
|
|
|
}
|
|
|
_, err = tx.Exec("UPDATE users SET age = ? WHERE id = ?", 35, 1)
|
|
|
if err != nil {
|
|
|
tx.Rollback() // 如果发生错误,回滚事务
|
|
|
fmt.Println("Exec failed:", err)
|
|
|
return
|
|
|
}
|
|
|
err = tx.Commit() // 提交事务
|
|
|
if err != nil {
|
|
|
fmt.Println("Commit failed:", err)
|
|
|
return
|
|
|
}
|
|
|
fmt.Println("Transaction committed successfully")
|
|
|
}
|
|
|
|
|
|
func dbTest() {
|
|
|
// 初始化数据库连接
|
|
|
DB, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test")
|
|
|
if err != nil {
|
|
|
fmt.Println("Error opening DB:", err)
|
|
|
return
|
|
|
}
|
|
|
defer DB.Close()
|
|
|
|
|
|
DB.SetConnMaxLifetime(time.Minute * 5)
|
|
|
DB.SetMaxOpenConns(50)
|
|
|
DB.SetMaxIdleConns(10)
|
|
|
|
|
|
// 测试数据库连接
|
|
|
if err := DB.Ping(); err != nil {
|
|
|
fmt.Println("Connection failed:", err)
|
|
|
return
|
|
|
}
|
|
|
fmt.Println("Connected successfully")
|
|
|
|
|
|
// 查询数据
|
|
|
rows, _ := DB.Query("SELECT id, name FROM users")
|
|
|
defer rows.Close()
|
|
|
|
|
|
for rows.Next() {
|
|
|
var id int
|
|
|
var name string
|
|
|
rows.Scan(&id, &name)
|
|
|
fmt.Printf("ID: %d, Name: %s\n", id, name)
|
|
|
}
|
|
|
|
|
|
// 插入数据
|
|
|
stmt, _ := DB.Prepare("INSERT INTO users(name, age) VALUES(?, ?)")
|
|
|
stmt.Exec("Jane Doe", 28)
|
|
|
|
|
|
// 删除数据
|
|
|
stmt, _ = DB.Prepare("DELETE FROM users WHERE id = ?")
|
|
|
stmt.Exec(1)
|
|
|
}
|
|
|
|
|
|
func timing(client mqtt.Client) {
|
|
|
//定时器,10秒钟执行一次
|
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
|
for {
|
|
|
time := <-ticker.C
|
|
|
fmt.Println("定时器====>", time.String())
|
|
|
if !client.IsConnectionOpen() {
|
|
|
client.Connect()
|
|
|
} else {
|
|
|
fmt.Println("client has connect")
|
|
|
//moreDBConnect()
|
|
|
//db, err := getMySqlDB()
|
|
|
//if err == nil {
|
|
|
// queryOneDataByMySql(db)
|
|
|
// //mydb.Close()
|
|
|
//}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
|
|
|
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
|
|
|
|
|
|
message := fmt.Sprintf("%s", msg.Payload())
|
|
|
topic := fmt.Sprintf("%s", msg.Topic())
|
|
|
fmt.Println(message)
|
|
|
result := strings.Split(message, " ")
|
|
|
fmt.Println(result[0])
|
|
|
fmt.Println(result[1])
|
|
|
|
|
|
now := time.Now()
|
|
|
temperature := new(models.Temperature)
|
|
|
temperature.CreateDate = now
|
|
|
temperature.DataDate = now.Format("2006-01-02")
|
|
|
temperature.DataHour = now.Format("2006-01-02 15")
|
|
|
temperature.DataMinute = now.Format("2006-01-02 15:04")
|
|
|
temperature.Humidity = result[0]
|
|
|
temperature.Temperature = result[1]
|
|
|
temperature.Topic = topic
|
|
|
|
|
|
if topic == "WifiSHT/7C87CE9CA4E6/SHT20" {
|
|
|
temperature.LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501"
|
|
|
}
|
|
|
if topic == "WifiSHT/7C87CE9F5CBF/SHT20" {
|
|
|
temperature.LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房"
|
|
|
}
|
|
|
if topic == "WifiSHT/4CEBD686B6AA/SHT20" {
|
|
|
temperature.LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号"
|
|
|
}
|
|
|
fmt.Println("测温传感器地点:", temperature.LocationDesc)
|
|
|
|
|
|
//db, err := getMySqlDB()
|
|
|
//if err == nil {
|
|
|
// insertDataByMySql(db,temperature)
|
|
|
// //mydb.Close()
|
|
|
//}
|
|
|
|
|
|
models.SaveTemperature(temperature)
|
|
|
|
|
|
//uuid := uuid.New()
|
|
|
//models.SaveProduct()
|
|
|
//models.SaveUser(uuid.String())
|
|
|
//models.SaveOrder()
|
|
|
|
|
|
//var users []models.User
|
|
|
//var orders []models.Order
|
|
|
//var user models.User
|
|
|
//mydb.DB.Where("name = ?", "wenfei").First(&user)
|
|
|
//fmt.Println(user)
|
|
|
|
|
|
// 查询多条记录
|
|
|
//mydb.DB.Find(&users) // 查询所有产品信息
|
|
|
//mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录
|
|
|
//fmt.Println("users:", users)
|
|
|
|
|
|
// 查询多条记录
|
|
|
//mydb.DB.Find(&users) // 查询所有产品信息
|
|
|
//mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders)
|
|
|
//fmt.Println("orders:", orders)
|
|
|
|
|
|
// 原生 SQL 查询
|
|
|
//var products []models.Product
|
|
|
//mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息
|
|
|
//fmt.Println("Products:", products)
|
|
|
|
|
|
}
|
|
|
|
|
|
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
|
|
|
fmt.Println("Connected")
|
|
|
subTemperature(client)
|
|
|
}
|
|
|
|
|
|
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
|
|
|
fmt.Printf("Connect lost: %v", err)
|
|
|
}
|
|
|
|
|
|
func sub(client mqtt.Client) {
|
|
|
topic := "gomqtt01"
|
|
|
token := client.Subscribe(topic, 1, nil)
|
|
|
token.Wait()
|
|
|
fmt.Printf("Subscribed to topic: %s", topic)
|
|
|
}
|
|
|
|
|
|
func subTemperature(client mqtt.Client) {
|
|
|
topic := "WifiSHT/+/SHT20"
|
|
|
token := client.Subscribe(topic, 2, nil)
|
|
|
token.Wait()
|
|
|
fmt.Printf("Subscribed to topic: %s", topic)
|
|
|
}
|
|
|
|
|
|
func publish(client mqtt.Client) {
|
|
|
num := 999999000000000000
|
|
|
for i := 0; i < num; i++ {
|
|
|
text := fmt.Sprintf("Message %d", i)
|
|
|
token := client.Publish("gomqtt01", 0, false, text)
|
|
|
token.Wait()
|
|
|
fmt.Println(i)
|
|
|
time.Sleep(6 * time.Second)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
|
|
|
|
uuid := uuid.New()
|
|
|
fmt.Println("Generated UUID:", uuid)
|
|
|
|
|
|
var broker = "47.242.184.139"
|
|
|
var port = 1883
|
|
|
opts := mqtt.NewClientOptions()
|
|
|
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
|
|
|
opts.SetClientID(uuid.String())
|
|
|
opts.SetUsername("admin")
|
|
|
opts.SetPassword("publish452131wW452131wW$")
|
|
|
opts.SetDefaultPublishHandler(messagePubHandler)
|
|
|
opts.OnConnect = connectHandler
|
|
|
opts.OnConnectionLost = connectLostHandler
|
|
|
client := mqtt.NewClient(opts)
|
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
fmt.Println(token.Error())
|
|
|
}
|
|
|
|
|
|
//client.IsConnectionOpen()
|
|
|
|
|
|
go timing(client)
|
|
|
|
|
|
//sub(client)
|
|
|
subTemperature(client)
|
|
|
publish(client)
|
|
|
|
|
|
//client.Disconnect(1000)
|
|
|
|
|
|
}
|