diff --git a/.idea/go_mqtt.iml b/.idea/go_mqtt.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/go_mqtt.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..ccad03b --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..35215e3 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + + \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cf681ef --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module go_mqtt + +go 1.21.3 + +require github.com/eclipse/paho.mqtt.golang v1.4.2 + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/go-sql-driver/mysql v1.9.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.4.2 // indirect + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..155c3bf --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/go-sql-driver/mysql v1.9.0 h1:Y0zIbQXhQKmQgTp44Y1dp3wTXcn804QoTptLZT1vtvo= +github.com/go-sql-driver/mysql v1.9.0/go.mod h1:pDetrLJeA3oMujJuvXc8RJoasr589B6A9fwzD3QMrqw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/install_go.sh b/install_go.sh new file mode 100755 index 0000000..607bec2 --- /dev/null +++ b/install_go.sh @@ -0,0 +1,4 @@ + + +go get -u github.com/go-sql-driver/mysql +go get github.com/google/uuid \ No newline at end of file diff --git a/main.go b/main.go new file mode 100644 index 0000000..1251a98 --- /dev/null +++ b/main.go @@ -0,0 +1,307 @@ +package main + +import ( + "database/sql" + "fmt" + mqtt "github.com/eclipse/paho.mqtt.golang" + _ "github.com/go-sql-driver/mysql" + "github.com/google/uuid" + "go_mqtt/models" + _ "go_mqtt/models" + "go_mqtt/mydb" + _ "go_mqtt/mydb" + "log" + "strings" + "time" +) + +func getMySqlDB() (*sql.DB, error){ + dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local" + db, err := sql.Open("mysql", dsn) + if err != nil { + log.Fatal(err) + return nil,err + } + db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期 + db.SetMaxOpenConns(50) // 最大连接数 + db.SetMaxIdleConns(10) // 最大空闲连接数 + + //defer mydb.Close() + + err = db.Ping() // 检查连接是否成功建立。如果失败,则返回错误。 + if err != nil { + // 处理连接失败的情况。例如,打印错误信息并退出程序。 + log.Println("47.242.184.139:3306连接失败") + fmt.Println("Connection failed:", err) + return nil,err + } else { + log.Println("47.242.184.139:3306连接成功") + fmt.Println("Connected successfully") + //复制代码到你的Go文件中,并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。 + //然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。 + //如果遇到任何问题,检查你的DSN是否正确,以及MySQL服务器是否正在运行并允许连接。 + //确保你的防火墙设置允许你的应用程序访问MySQL端口(默认是3306)。 + //如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。 + return db,nil + } +} + +func moreDBConnect() { + db1, err := mydb.NewDatabase("appserver", "root", "Skyinno251,", "47.242.184.139", 3306) + if err != nil { + fmt.Println("Error connecting to db1:", err) + return + } + defer db1.Db.Close() + + fmt.Println("Connected to databases:", db1.Name) + + //db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306) + //if err != nil { + // fmt.Println("Error connecting to db2:", err) + // return + //} + //defer db2.Conn.Close() + // + //fmt.Println("Connected to databases:", db1.Name, "and", db2.Name) +} + + + +func queryOneDataByMySql(db *sql.DB) { + //DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据: + var temperature models.Temperature + err := db.QueryRow("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?",1).Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic) + if err != nil { + fmt.Println("QueryRow failed:", err) + return + } + fmt.Printf("Temperature Id:%d\n", temperature.Id) + fmt.Printf("Temperature: %+v\n", temperature) +} + +func queryMoreDataByMySql(db *sql.DB) { + //DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据: + rows, err := db.Query("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?", 0,30) + if err != nil { + fmt.Println("Query failed:", err) + return + } + defer rows.Close() // 确保在使用完毕后关闭资源 + + items := make([]models.Temperature, 0) + for rows.Next() { + var temperature models.Temperature + if err := rows.Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic); err != nil { + fmt.Println("Scan failed:", err) + return + } + items = append(items, temperature) + fmt.Println(items) + } +} + +//插入单条数据 +func insertDataByMySql(db *sql.DB,temperature *models.Temperature) { + //插入操作: + stmt, err := db.Prepare("INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)") + if err != nil { + fmt.Println("Prepare failed:", err) + return + } + res, err := stmt.Exec(&temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic) + if err != nil { + fmt.Println("Exec failed:", err) + return + } + lastID, _ := res.LastInsertId() + fmt.Printf("Inserted Temperature with ID: %d\n", lastID) +} + +//事务处理:保证数据一致性 +func insertMoreDataByMySql(db *sql.DB,temperature *models.Temperature) { + tx, err := db.Begin() + if err != nil { + fmt.Println("Begin transaction failed:", err) + return + } + _, err = tx.Exec("UPDATE users SET age = ? WHERE id = ?", 35, 1) + if err != nil { + tx.Rollback() // 如果发生错误,回滚事务 + fmt.Println("Exec failed:", err) + return + } + err = tx.Commit() // 提交事务 + if err != nil { + fmt.Println("Commit failed:", err) + return + } + fmt.Println("Transaction committed successfully") +} + + +func dbTest() { + // 初始化数据库连接 + DB, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test") + if err != nil { + fmt.Println("Error opening DB:", err) + return + } + defer DB.Close() + + DB.SetConnMaxLifetime(time.Minute * 5) + DB.SetMaxOpenConns(50) + DB.SetMaxIdleConns(10) + + // 测试数据库连接 + if err := DB.Ping(); err != nil { + fmt.Println("Connection failed:", err) + return + } + fmt.Println("Connected successfully") + + // 查询数据 + rows, _ := DB.Query("SELECT id, name FROM users") + defer rows.Close() + + for rows.Next() { + var id int + var name string + rows.Scan(&id, &name) + fmt.Printf("ID: %d, Name: %s\n", id, name) + } + + // 插入数据 + stmt, _ := DB.Prepare("INSERT INTO users(name, age) VALUES(?, ?)") + stmt.Exec("Jane Doe", 28) + + // 删除数据 + stmt, _ = DB.Prepare("DELETE FROM users WHERE id = ?") + stmt.Exec(1) +} + + +func timing(client mqtt.Client) { + //定时器,10秒钟执行一次 + ticker := time.NewTicker(5 * time.Second) + for { + time := <-ticker.C + fmt.Println("定时器====>", time.String()) + if !client.IsConnectionOpen() { + client.Connect() + } else { + fmt.Println("client has connect") + //moreDBConnect() + db, err := getMySqlDB() + if err == nil { + queryOneDataByMySql(db) + //mydb.Close() + } + } + } +} + +var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) + + message := fmt.Sprintf("%s",msg.Payload()) + topic := fmt.Sprintf("%s",msg.Topic()) + fmt.Println(message) + result := strings.Split(message, " ") + fmt.Println(result[0]) + fmt.Println(result[1]) + + now := time.Now() + temperature := new(models.Temperature) + temperature.CreateDate=now + temperature.DataDate=now.Format("2006-01-02") + temperature.DataHour=now.Format("2006-01-02 15") + temperature.DataMinute=now.Format("2006-01-02 15:04") + temperature.Humidity=result[0] + temperature.Temperature=result[1] + temperature.Topic=topic + + if topic=="WifiSHT/7C87CE9CA4E6/SHT20" { + temperature.LocationDesc="广东省珠海市高新区唐家湾镇东岸村水风三街28号501" + } + if topic=="WifiSHT/7C87CE9F5CBF/SHT20" { + temperature.LocationDesc="广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房" + } + if topic=="WifiSHT/4CEBD686B6AA/SHT20" { + temperature.LocationDesc="广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号" + } + + db, err := getMySqlDB() + if err == nil { + insertDataByMySql(db,temperature) + //mydb.Close() + } +} + +var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { + fmt.Println("Connected") +} + +var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { + fmt.Printf("Connect lost: %v", err) +} + +func sub(client mqtt.Client) { + topic := "gomqtt01" + token := client.Subscribe(topic, 1, nil) + token.Wait() + fmt.Printf("Subscribed to topic: %s", topic) +} + +func subTemperature(client mqtt.Client) { + topic := "WifiSHT/+/SHT20" + token := client.Subscribe(topic, 1, nil) + token.Wait() + fmt.Printf("Subscribed to topic: %s", topic) +} + +func publish(client mqtt.Client) { + num := 999999000000000000 + for i := 0; i < num; i++ { + text := fmt.Sprintf("Message %d", i) + token := client.Publish("gomqtt01", 0, false, text) + token.Wait() + fmt.Println(i) + time.Sleep(6 * time.Second) + } +} + + + + +func main() { + + uuid := uuid.New() + fmt.Println("Generated UUID:", uuid) + + var broker = "47.242.184.139" + var port = 1883 + opts := mqtt.NewClientOptions() + opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) + opts.SetClientID(uuid.String()) + opts.SetUsername("admin") + opts.SetPassword("publish452131wW452131wW$") + opts.SetDefaultPublishHandler(messagePubHandler) + opts.OnConnect = connectHandler + opts.OnConnectionLost = connectLostHandler + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + } + + //client.IsConnectionOpen() + + go timing(client) + + //sub(client) + subTemperature(client) + publish(client) + + //client.Disconnect(1000) + +} diff --git a/models/Temperature.go b/models/Temperature.go new file mode 100644 index 0000000..24518e8 --- /dev/null +++ b/models/Temperature.go @@ -0,0 +1,15 @@ +package models + +import "time" + +type Temperature struct { + Id int64 + CreateDate time.Time + DataDate string + DataHour string + DataMinute string + Humidity string + LocationDesc string + Temperature string + Topic string +} diff --git a/mydb/ManageDb.go b/mydb/ManageDb.go new file mode 100644 index 0000000..13bd25c --- /dev/null +++ b/mydb/ManageDb.go @@ -0,0 +1,24 @@ +package mydb + +import ( + "database/sql" + "fmt" + "time" +) + +type ManageDb struct { + Name string + Db *sql.DB +} + +func NewDatabase(dbName, user, password, host string, port int) (*ManageDb, error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", user, password, host, port, dbName) + db, err := sql.Open("mysql", dsn) + db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期 + db.SetMaxOpenConns(50) // 最大连接数 + db.SetMaxIdleConns(10) // 最大空闲连接数 + if err != nil { + return nil, err + } + return &ManageDb{Name: dbName, Db: db}, nil +} \ No newline at end of file