什么是websocket

Websocket是一款前端与后端实现长连接的技术

Websocket是基于Http1.1的技术

在nginx中websocket的使用要在location中增加如下的配置参数

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

在后端使用websocket做客户端

  • 首先需要引入jar包
        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.3.5</version>
        </dependency>
  • 后台Websocket客户端写法
package com.hollysys.watermeter.alarm.websocket;

import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;

/**
 * websocket Client
 * 无需调用平台
 *
 * @author niuniu
 */
public class WebsocketClient {

    public WebSocketClient client;

    public WebSocketClient startWebSocket() {
        //创建订阅
        String wsUrl = "";
        try {
            client = new WebSocketClient(new URI(wsUrl), new Draft_6455()) {
                @Override
                public void onOpen(ServerHandshake serverHandshake) {
                    System.out.println("打开链接");
                }

                @Override
                public void onMessage(String s) {
                    System.out.println("收到消息" + s);
                }

                @Override
                public void onClose(int i, String s, boolean b) {
                    System.out.println("链接已关闭");
                }

                @Override
                public void onError(Exception e) {
                    e.printStackTrace();
                    System.out.println("发生错误已关闭");
                }
            };
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }

        client.connect();
        while (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
            System.out.println("连接还没打开,请稍后。。。");
            try {
                TimeUnit.SECONDS.sleep(2L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("打开了");
        return client;
    }
}
  • 通过上面的代码就可以简单的去连接websocket服务器了

但是这种连接如果发生网络波动怎么办

后端通过看门狗监控websocket客户端的运行状况

  • 什么是看门狗

看门狗就是用来保家护院,保证程序在异常停止时,能够重新启动,从而保证系统能够稳定运行的一些列功能的总称

  • 看门狗设计思路

基于看门狗的定义,必须有如下几个功能

  1. 监控当前websocket服务的运行状态
  2. 当服务运行异常时,能够主动触发做特定的处理

本次看门狗做的功能主要就是:

  1. 监控状态
  2. 重连

其实你也可以让他自动输出日志,发送邮件,发送通知等等功能

  • 看门狗实现思路
  1. 要能够不间断的扫描客户端运行情况

两种实现方式:

  1. 利用spring的注解@Scheduled(fixedRate = 2000),适合单一客户端监控
  2. 自定义线程,每启动一个客户端,开启一个看门狗线程,适合后台需要管理多个客户端使用,在关闭客户端时需要先关闭看门狗,再关闭客户端。
  1. 可以准确地启动与监视要监控的客户端
  • 实现代码示例

1.线程池(推荐使用ThreadPoolExecutor作为线程池)我这里没有使用,随便写了点代码

package com.hollysys.watermeter.alarm.tool;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 线城池
 * websocket不再被需求
 * 线程池暂时不要了
 *
 * @author niuniu
 * @since 2020/7/27 10:57
 */
//@Deprecated
public class MyThreadPool {
    private static final ConcurrentHashMap<String, Thread> threadConcurrentHashMap = new ConcurrentHashMap<>();
    private static int i;

    public static Thread getThread(String uri, Runnable target) {
        if (threadConcurrentHashMap.get(uri) != null) {
            return null;
        }
        Thread thread = new Thread(target);
        thread.setName("自定义线程:" + ++i);
        threadConcurrentHashMap.put(uri, thread);
        return thread;
    }

    public static void stopThread(String uri) {
        Thread thread = threadConcurrentHashMap.get(uri);
        System.out.println(thread);
        if (thread != null) {
            thread.stop();
        }
        threadConcurrentHashMap.remove(uri);
    }
}
  1. 记录关系的类,因为当时博主项目特殊,后台既是客户端又是服务端才会有这个(删除)
package com.hollysys.watermeter.alarm.tool;

import org.java_websocket.client.WebSocketClient;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 消息数量统计工具
 * websocket功能移动到前端
 * 此处代码暂时无用
 *
 * @author niuniu
 * @since 2020/7/14 10:21
 */
//@Deprecated
public class MsgCountUtil {
    public static ConcurrentHashMap<String, WebSocketClient> userMsgSourceMap = new ConcurrentHashMap<>();

    /**
     * 暂存
     */
    public static void setWebSocketClient(String userSession, WebSocketClient client) {
        userMsgSourceMap.put(userSession, client);
    }

    /**
     * 返回
     */
    public static WebSocketClient getWebSocketClient(String userSession) {
        return userMsgSourceMap.get(userSession);
    }

    /**
     * 移除
     */
    public static void removeWebSocketClient(String userSession) {
        userMsgSourceMap.remove(userSession);
    }

}
  1. 看门狗的实现(因为里面有一些项目代码,看着会比较乱,其实删掉一些无用的“参数”!)就清晰了
package com.hollysys.watermeter.alarm.global;

import com.hollysys.watermeter.WatermeterApplication;
import com.hollysys.watermeter.alarm.domain.UriParam;
import com.hollysys.watermeter.alarm.tool.MsgCountUtil;
import com.hollysys.watermeter.alarm.tool.MyThreadPool;
import com.hollysys.watermeter.alarm.websocket.WebsocketClient;
import org.java_websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * websocket看门狗
 *
 * @author niuniu
 * @since 2020/7/10 16:30
 */
@Service
public class WebSocketWatchDog {
    /**
     * 链接状态标识
     */
    private static final Map<String, Boolean> isConnect = new ConcurrentHashMap<>();
    public static Map<String, Integer> connectCount = new ConcurrentHashMap<>();
    private static final Logger logger = LoggerFactory.getLogger(WatermeterApplication.class);

    /**
     * 修改链接状态方法
     *
     * @param status 当前状态
     */
    public static void changeIsConnect(String userSession, Boolean status) {
        isConnect.put(userSession, status);
    }

    /**
     * 断线重连
     */
    private void reConnect(String userSession, String type, List<UriParam> uriParamList) {
        Boolean aBoolean = isConnect.get(userSession);
        if (aBoolean == null) {
            aBoolean = false;
        }
        Integer integer = connectCount.get(userSession);
        if (integer == null) {
            integer = 0;
        }
        if (!aBoolean) {
            logger.info("与平台WebSocket连接断开,正在尝试第{}次重连。。。", ++integer);
            WebsocketClient websocketClient = new WebsocketClient();
            try {
                WebSocketClient client = websocketClient.startWebSocket(userSession, type, uriParamList);
                MsgCountUtil.setWebSocketClient(userSession, client);
            } catch (Exception e) {
                logger.info("链接失败,再次尝试!");
            }
        }
        connectCount.put(userSession, integer);
    }

    public void start(String userSession, String type, List<UriParam> uriParamList) {
        watch(userSession, type, uriParamList);
        logger.info("websocket启动线程已完成任务!");
    }

    /**
     * 看门狗监视
     */
    private void watch(String userSession, String type, List<UriParam> uriParamList) {
        WebSocketWatchDog webSocketWatchDog = new WebSocketWatchDog();
        //这个线程不会结束
        Thread thread = MyThreadPool.getThread(userSession, () -> {
            for (; ; ) {
                webSocketWatchDog.reConnect(userSession, type, uriParamList);
                try {
                    TimeUnit.SECONDS.sleep(5L);
                    logger.info("汪汪:看门狗正在检测websocket连接状态!");
                } catch (InterruptedException e) {
                    logger.info("线程:{},被强制退出了!", Thread.currentThread().getName());
                }
            }
        });

        if (thread == null) {
            disConnectWebsocket(userSession, type);
            //再次调用监视
            watch(userSession, type, uriParamList);
            return;
        }
        thread.start();
    }

    /**
     * 断开websocket订阅
     */
    public void disConnectWebsocket(String userSession, String type) {
        //关闭推送积累
        if ("alarm".equals(type)) {
            MsgCountUtil.clearAlarm(userSession);
        } else {
            MsgCountUtil.clearCommunication(userSession);
        }
        //关闭看门狗,清理线程
        MyThreadPool.stopThread(userSession);
        //关闭websocket订阅
        WebSocketClient client = MsgCountUtil.getWebSocketClient(userSession);
        if (client != null) {
            client.close();
        }
    }
}

这样就完成了

启动的时候,直接启动看门狗的start方法就可以启动了,因为看门狗会一直盯着你交给他的任务,它发现任务不在执行状态,就会主动启动起来!

Last modification:September 3rd, 2020 at 01:02 pm
如果觉得我的文章对你有用,请随意赞赏