package com.rehome.adminclientappmqttserver; import com.google.gson.Gson; import com.rehome.adminclientappmqttserver.controller.UseryfController; import com.rehome.adminclientappmqttserver.controller.UserzyController; import com.rehome.adminclientappmqttserver.entity.Userzy; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttRSAClientZYAPP { /** * 代理服务器ip地址 */ public static final String MQTT_BROKER_HOST = "tcp://39.101.173.20:1883"; /** * 客户端唯一标识 */ public static final String MQTT_CLIENT_ID = "AppServer_ZY_APP_server_02"; /** * */ public static final String USERNAME = "admin"; /** * 密码 */ public static final String PASSWORD = "public"; /** * 订阅标识 */ public static final String TOPIC_FILTER = "app_push_zy"; private volatile static MqttClient mqttClient; private static MqttConnectOptions options; private static int qos = 2; private UserzyController userzyController; public MqttRSAClientZYAPP(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); // 配置参数信息 options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(USERNAME); // 设置密码 options.setPassword(PASSWORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //断线重连 options.setAutomaticReconnect(true); } catch (Exception e) { e.printStackTrace(); } } public void start(UserzyController userController) { this.userzyController=userController; try { // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe(TOPIC_FILTER,qos); // 设置回调 mqttClient.setCallback(new MqttCallbackExtended(){ @Override public void connectionLost(Throwable throwable) { System.out.println("connectionLost"); try { mqttClient.reconnect(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("topic:"+s); System.out.println("Qos:"+mqttMessage.getQos()); System.out.println("message RSA:"+new String(mqttMessage.getPayload())); System.out.println("湛江中粤收到mqtt消息"); try { String messageDe = RSAAndroid.decryptByPrivateKeyForSpiltStr(new String(mqttMessage.getPayload()), RSAAndroid.privateRsaKey); System.out.println("message content:"+messageDe); Gson gson = new Gson(); Userzy userInfo = gson.fromJson(messageDe, Userzy.class); System.out.println(userInfo.getUsername()); System.out.println(userInfo.getPassword()); System.out.println(userInfo.getDate()); System.out.println(userInfo.getNfc()); userzyController.saveUserZY(userInfo); }catch (Exception e){ e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("deliveryComplete---------"+ iMqttDeliveryToken.isComplete()); } @Override public void connectComplete(boolean b, String s) { //连接成功后调用 try { mqttClient.subscribe(TOPIC_FILTER,qos);//具体订阅代码 } catch (MqttException e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } } }