App下載

未讀消息(小紅點(diǎn)),前端 與 RabbitMQ 實(shí)時(shí)消息推送實(shí)踐,賊簡(jiǎn)單~

猿友 2020-09-09 10:39:50 瀏覽數(shù) (3229)
反饋

文章轉(zhuǎn)載自公眾號(hào):程序員內(nèi)點(diǎn)事

前幾天粉絲群里有個(gè)小伙伴問(wèn)過(guò):web 頁(yè)面的未讀消息(小紅點(diǎn))怎么實(shí)現(xiàn)比較簡(jiǎn)單,剛好本周手頭有類似的開(kāi)發(fā)任務(wù),索性就整理出來(lái)供小伙伴們參考,沒(méi)準(zhǔn)哪天就能用得上呢。

MQTT協(xié)議應(yīng)用場(chǎng)景

web 端實(shí)時(shí)消息推送,常用的實(shí)現(xiàn)方式比較多,但萬(wàn)變不離其宗,底層基本上還是依賴于 websocketMQTT 協(xié)議也不例外。

RabbitMQ 搭建

RabbitMQ的基礎(chǔ)搭建就不詳細(xì)說(shuō)了,自行百度一步一步搞問(wèn)題不大,這里主要說(shuō)一下兩個(gè)比較重要的配置。

1、開(kāi)啟 mqtt 協(xié)議

默認(rèn)情況下RabbitMQ是不開(kāi)啟MQTT 協(xié)議的,所以需要我們手動(dòng)的開(kāi)啟相關(guān)的插件,而RabbitMQMQTT 協(xié)議分為兩種。

第一種 rabbitmq_mqtt 提供與后端服務(wù)交互使用,對(duì)應(yīng)端口1883。

rabbitmq-plugins enable rabbitmq_mqtt

第二種 rabbitmq_web_mqtt 提供與前端交互使用,對(duì)應(yīng)端口15675。

rabbitmq-plugins enable rabbitmq_web_mqtt 

RabbitMQ 管理后臺(tái)看到如下的顯示,就表示MQTT 協(xié)議開(kāi)啟成功,到這中間件環(huán)境就搭建完畢了。

協(xié)議對(duì)應(yīng)端口號(hào)

使用MQTT 協(xié)議默認(rèn)的交換機(jī) Exchangeamp.topic,而我們訂閱的主題會(huì)在 Queues 注冊(cè)一個(gè)客戶端隊(duì)列,路由 Routing key 就是我們?cè)O(shè)置的主題。

交換機(jī)信息

服務(wù)端消息發(fā)送

web 端實(shí)時(shí)消息推送一般都是單向的推送,前端接收服務(wù)端推送的消息顯示即可,所以就只實(shí)現(xiàn)消息發(fā)送即可。

1、mqtt 客戶端依賴包

引入 spring-integration-mqttorg.eclipse.paho.client.mqttv3兩個(gè)工具包實(shí)現(xiàn)

<!--mqtt依賴包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

2、消息發(fā)送者

消息的發(fā)送比較簡(jiǎn)單,主要是應(yīng)用到 @ServiceActivator 注解,需要注意messageHandler.setAsync屬性,如果設(shè)置成 false,關(guān)閉異步模式發(fā)送消息時(shí)可能會(huì)阻塞。

@Configuration
public class IotMqttProducerConfig {


    @Autowired
    private MqttConfig mqttConfig;


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }


    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
        messageHandler.setAsync(false);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }
}

MQTT 對(duì)外提供發(fā)送消息的 API 時(shí),需要使用 @MessagingGateway注解,去提供一個(gè)消息網(wǎng)關(guān)代理,參數(shù) defaultRequestChannel 指定發(fā)送消息綁定的channel。

可以實(shí)現(xiàn)三種API接口,payload 為發(fā)送的消息,topic 發(fā)送消息的主題,qos 消息質(zhì)量。

@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {


    // 向默認(rèn)的 topic 發(fā)送消息
    void sendMessage2Mqtt(String payload);
    // 向指定的 topic 發(fā)送消息
    void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    // 向指定的 topic 發(fā)送消息,并指定服務(wù)質(zhì)量參數(shù)
    void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

前端消息訂閱

前端使用與服務(wù)端對(duì)應(yīng)的工具 paho-mqtt mqttws31.js實(shí)現(xiàn),實(shí)現(xiàn)方式與傳統(tǒng)的 websocket 方式差不多,核心方法 client = new Paho.MQTT.Client 和 各種監(jiān)聽(tīng)事件,代碼比較簡(jiǎn)潔。

注意:要保證前后端 clientId的全局唯一性,我這里就簡(jiǎn)單用隨機(jī)數(shù)解決了

<script type="text/javascript">
        // mqtt協(xié)議rabbitmq服務(wù)
        var brokerIp = location.hostname;
        // mqtt協(xié)議端口號(hào)
        var port = 15675;
        // 接受推送消息的主題
        var topic = "push_message_topic";


        // mqtt連接
        client = new Paho.MQTT.Client(brokerIp, port, "/ws", "clientId_" + parseInt(Math.random() * 100, 10));


        var options = {
            timeout: 3, //超時(shí)時(shí)間
            keepAliveInterval: 30,//心跳時(shí)間
            onSuccess: function () {
                console.log(("連接成功~"));
                client.subscribe(topic, {qos: 1});
            },
            onFailure: function (message) {
                console.log(("連接失敗~" + message.errorMessage));
            }
        };
        // 考慮到https的情況
        if (location.protocol == "https:") {
            options.useSSL = true;
        }
        client.connect(options);
        console.log(("已經(jīng)連接到" + brokerIp + ":" + port));


        // 連接斷開(kāi)事件
        client.onConnectionLost = function (responseObject) {
            console.log("失去連接 - " + responseObject.errorMessage);
        };


        // 接收消息事件
        client.onMessageArrived = function (message) {
            console.log("接受主題: " + message.destinationName + "的消息: " + message.payloadString);
            $("#arrivedDiv").append("<br/>"+message.payloadString);
            var count = $("#count").text();
            count = Number(count) + 1;
            $("#count").text(count);
        };


        // 推送給指定主題
        function sendMessage() {
            var a = $("#message").val();
            if (client.isConnected()) {
                var message = new Paho.MQTT.Message(a);
                message.destinationName = topic;
                client.send(message);
            }
        }
    </script>

測(cè)試

前后端的代碼并不多,接下來(lái)我們測(cè)試一下,弄了個(gè)頁(yè)面看看效果。

首先用 postman 模擬后端發(fā)送消息

http://127.0.0.1:8080/fun/sendMessage?message=我是程序員內(nèi)點(diǎn)事&topic=push_message_topic

模擬發(fā)送消息

再看一下前端訂閱消息的效果,看到消息被實(shí)時(shí)推送到了前端,這里只做了未讀消息數(shù)量統(tǒng)計(jì),一般還會(huì)做未讀消息詳情列表。

實(shí)時(shí)消息推送動(dòng)圖

總結(jié)

未讀消息是一個(gè)十分常見(jiàn)的功能,不管是 web端還是移動(dòng)端系統(tǒng)都是必備的模塊,MQTT 協(xié)議只是其中的一種實(shí)現(xiàn)方式,還是有必要掌握一種方法。具體用什么工具實(shí)現(xiàn)還是要看具體的業(yè)務(wù)場(chǎng)景和學(xué)習(xí)成本,像我用RabbitMQ 做還考慮到一些運(yùn)維成本在里邊。

以上就是W3Cschool編程獅關(guān)于未讀消息(小紅點(diǎn)),前端 與 RabbitMQ 實(shí)時(shí)消息推送實(shí)踐,賊簡(jiǎn)單~的相關(guān)介紹了,希望對(duì)大家有所幫助。

0 人點(diǎn)贊