自学内容网 自学内容网

Android mqttv5

Android MQTTv5

一、导入依赖

implementation 'org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

二、service代码

package com.zg.fragmentdemo.mqtt;
​
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.Handler;
import android.os.IBinder;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;
​
import com.google.gson.Gson;
​
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
​
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
​
public class MessageQueueService extends Service {
​
    private static final String TAG = MessageQueueService.class.getName();
    private final Gson gson = new Gson();
    int qos = 2;
    private String clientId;
    private String topic;
    private MqttAsyncClient mqttAsyncClient;
    private final MqttCallback mqttCallback = new MqttCallback() {
​
        @Override
        public void disconnected(MqttDisconnectResponse disconnectResponse) {
            Log.d(TAG, "disconnected " + disconnectResponse.toString());
        }
​
        @Override
        public void mqttErrorOccurred(MqttException exception) {
            Log.d(TAG, "mqttErrorOccurred " + exception.getMessage());
        }
​
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String msg = new String(message.getPayload());
            Log.d(TAG, String.format("messageArrived Topic: %s, Id: %s, QoS: %s, Message: %s, ", topic, message.getId(), message.getQos(), message));
            //这里是收到消息
        }
​
        @Override
        public void deliveryComplete(IMqttToken token) {
            Log.d(TAG, "deliveryComplete");
        }
​
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            Log.d(TAG, "connectComplete: " + topic + " reconnect: " + reconnect + " serverURI: " + serverURI);
        }
​
        @Override
        public void authPacketArrived(int reasonCode, MqttProperties properties) {
            Log.d(TAG, "authPacketArrived reasonCode: " + reasonCode + " properties: " + properties.toString());
        }
    };
​
    /**
     * 开启服务
     */
    public static void startService(Context context) {
        Log.i("zxd", "startService: ");
        context.startService(new Intent(context, MessageQueueService.class));
    }
​
    public MessageQueueService() {
​
    }
​
    @Override
    public IBinder onBind(Intent intent) {
        // TODO: Return the communication channel to the service.
        throw new UnsupportedOperationException("Not yet implemented");
    }
​
    @Override
    public void onCreate() {
        super.onCreate();
​
        topic = "ReceivePayMessage/9/1";
        clientId = UUID.randomUUID().toString();//这里也可以使用AndroidId
​
        try {
​
            MemoryPersistence persistence = new MemoryPersistence();
            mqttAsyncClient = new MqttAsyncClient("tcp://x.y.z.t:1884", clientId, persistence);
            MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
            mqttConnectionOptions.setUserName("admin123");
            mqttConnectionOptions.setPassword("admin123".getBytes());
            mqttConnectionOptions.setCleanStart(false);
            mqttConnectionOptions.setConnectionTimeout(60);
            mqttConnectionOptions.setKeepAliveInterval(30);
            mqttConnectionOptions.setAutomaticReconnect(true);
            mqttAsyncClient.setCallback(mqttCallback);
            IMqttToken token = mqttAsyncClient.connect(mqttConnectionOptions);
​
            token.waitForCompletion();
            if (token.isComplete()) {
                Log.d(TAG, "Subscribe: " + topic);
                mqttAsyncClient.subscribe(topic, qos);
            }
​
        } catch (MqttException me) {
            Log.e(TAG, String.format("MqttException: %s, msg: %s, loc: %s, cause: %s", me.getReasonCode(), me.getMessage(), me.getLocalizedMessage(), me.getCause()));
        } catch (Exception e) {
            Log.e(TAG, e.getMessage());
        }
    }
​
    @Override
    public void onDestroy() {
        super.onDestroy();
        Log.d(TAG, "isConnected " + mqttAsyncClient.isConnected());
​
        if (mqttAsyncClient.isConnected()) {
            try {
                mqttAsyncClient.disconnect();
                mqttAsyncClient.close();
            } catch (MqttException e) {
                Log.d(TAG, "onDestroy " + e);
            }
            Log.d(TAG, "Close client." + mqttAsyncClient.isConnected());
        }
​
        Log.d(TAG, "onDestroy.");
    }
​
    private void broadcastImage(String session, String url) {
        Intent intent = new Intent();
        intent.setAction("main.mqtt");
        intent.putExtra("session", session);
        intent.putExtra("image", url);
        sendBroadcast(intent);
    }
​
    private void broadcastAudio(String session, String url) {
        Intent intent = new Intent();
        intent.setAction("main.mqtt");
        intent.putExtra("session", session);
        intent.putExtra("audio", url);
        sendBroadcast(intent);
    }
​
    private void broadcastStory(String session, String text) {
        Intent intent = new Intent();
        intent.setAction("main.mqtt");
        intent.putExtra("session", session);
        intent.putExtra("story", text);
        sendBroadcast(intent);
    }
​
    private void broadcastProgress(String progress) {
        Intent intent = new Intent();
        intent.setAction("main.progress");
        intent.putExtra("progress", Integer.valueOf(progress));
        sendBroadcast(intent);
    }
​
    private void toast(String message) {
        Handler handler = new Handler(Looper.getMainLooper());
        handler.post(() -> {
            Toast.makeText(getApplicationContext(), message, Toast.LENGTH_LONG).show();
        });
    }
​
    public String logcat(String filter) {
        StringBuffer buffer = new StringBuffer();
        String command = "logcat -t 100";
        if (!filter.isEmpty()) {
            command.concat(String.format(" | grep %s", filter));
        } else {
            command.concat("");
        }
        try {
            Process process = Runtime.getRuntime().exec(command);
            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String temp;
            while ((temp = reader.readLine()) != null) {
                buffer.append(temp).append("\n");
            }
​
        } catch (IOException e) {
            Log.e(TAG, e.toString());
        }
        return buffer.toString();
    }
}

在AndroidManifest.xml中添加服务

<service
    android:name=".mqtt.MessageQueueService"
    android:enabled="true"
    android:exported="true" />

三、使用service

package cn.netkiller.student;
 
 
import static cn.netkiller.student.cloud.Api.token;
import static cn.netkiller.student.utils.AndroidManager.fullscreen;
 
 
import cn.netkiller.student.service.MessageQueueService;
 
 
public class MainActivity extends AppCompatActivity {
    private static final String TAG = MainActivity.class.getSimpleName();
 
    private ActivityMainBinding binding;
 
    private Intent messageQueueService;
 
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        binding = ActivityMainBinding.inflate(getLayoutInflater());
        setContentView(binding.getRoot());
       
        messageQueueService = new Intent(this, MessageQueueService.class);
        startService(messageQueueService);
//        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
//            startForegroundService(new Intent(MainActivity.this, MessageQueueService.class));
//        } else {
//            startService(new Intent(MainActivity.this, MessageQueueService.class));
//        }
    }
 
    
    @Override
    protected void onDestroy() {
        super.onDestroy();
        stopService(messageQueueService);
    }
 
    @Override
    public void onResume() {
        super.onResume();
        Config.Api.token = token();
        startService(messageQueueService);
    }
 
    @Override
    public void onStop() {
        super.onStop();
        stopService(messageQueueService);
    }
}

四、参考链接

MQTTV5 异步

MQTTV5 同步

GitHub

如果要查看V3或V5的版本,可以点击Tags查看(在code界面,点Tags)

可以google搜索 “client.mqttv5 Android”、“mqttv5 android”

Android MQTTv5 代码示例

5步让你的Android应用集成MQTT功能(MQTT V5版本) vip免费

Android开发,通过使用mqtt3.1.1版本和mqtt5.0版本进行mqtt数据的发送和接收,实现和服务端数据的交互 需要积分下载

MQTT5.0

MQTT4、5所有消息解析与对比


原文地址:https://blog.csdn.net/fromVillageCoolBoy/article/details/142586802

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!