做uml图网站,wordpress网站地图插件,网站建设定位分析论文,上海信息技术做网站前言
小编我将用CSDN记录软件开发求学之路上亲身所得与所学的心得与知识#xff0c;有兴趣的小伙伴可以关注一下#xff01;
也许一个人独行#xff0c;可以走的很快#xff0c;但是一群人结伴而行#xff0c;才能走的更远#xff01;让我们在成长的道路上互相学习有兴趣的小伙伴可以关注一下
也许一个人独行可以走的很快但是一群人结伴而行才能走的更远让我们在成长的道路上互相学习让我们共同进步欢迎关注
针对websocket技术的金融alltick股票实战经验通过调用第三方wss的的数据来获取实时数据并保持性能高及效率高
1、在springboot中引入websocket相应的jar包 !-- Spring Boot WebSocket Starter --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId/dependency
2.创建webSocketConfig 暴露endpoint端点
package com.nq.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;Configuration
public class WebSocketConfig {Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}3创建websocket客户端用于连接第三方的wss
package com.nq.common;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nq.pojo.Stock;
import com.nq.service.IStockService;
import com.nq.utils.PropertiesUtil;
import com.nq.utils.StringUtils;
import com.nq.utils.redis.RedisShardedPool;
import com.nq.utils.redis.RedisShardedPoolUtils;
import com.nq.vo.stock.StockListVO;
import com.nq.vo.websocket.CodeVo;
import com.nq.vo.websocket.StockVo;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.hpsf.Decimal;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.websocket.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** Description: websocket客户端* 一共2800条code产品数据,每个webSocketServer处理1000条数据分三个webSocketServer处理* 提高效率* Author: jade* Date: 2021/8/25 10:25*/
ClientEndpoint
Slf4j
Component //交给spring容器管理
Data
public class WebSocketJavaExample {private Session session; //会话对象private Boolean flag true; //用来拯救流的关闭标识符private MapString,StockVo stockVoMap;private ListStockVo stockList; //返回给客户端的封装数据List集合public final static Integer MAXCAP 1000; //一次性以1000条Resource //使用Resource来装配不然会为nullIStockService stockService;private ObjectMapper objectMappernew ObjectMapper();OnOpenpublic void onOpen(Session session) {this.session session;}
/**
接收第三方服务端的消息
**/OnMessagepublic void onMessage(String message) {if(message.indexOf(data) ! -1) {try {JSONObject jsonObject JSONUtil.parseObj(message);String dataStr jsonObject.getStr(data);//第三方响应的Json数据if (dataStr ! null) {
// JSONArray jsonArray JSONUtil.parseArray(dataStr);
// JSONObject jsonObject JSONUtil.parseObj(dataStr);
// jsonArray.stream().forEach(item - {JSONObject json JSONUtil.parseObj(dataStr);OptionalStockVo stockVo stockList.stream()//Optional为java8的Stream API中使用处理可能为null的元素.filter(p - json.getStr(code).equals(p.getCode().concat(.US))).findFirst();
// .filter(p - json.getStr(code).equals(p.getCode())).findFirst();stockVo.ifPresent(vo - {// 当前价格BigDecimal nowPrice new BigDecimal(json.getStr(price));BigDecimal preClosePrice vo.getPreclose_px();vo.setType(json.getStr(trade_direction));// alltick websocket 获取数据 替换原来的当前价格和涨幅vo.setNowPrice(nowPrice);// 计算涨幅BigDecimal chg nowPrice.subtract(preClosePrice).divide(preClosePrice, 4, BigDecimal.ROUND_HALF_UP).multiply(new BigDecimal(100));vo.setHcrate(chg);});log.info(OptionalStockVo send message to clientstockVo);
// });} else {log.error(data字段不是一个有效的JSON数组: {}, dataStr);}} catch (Exception e) {log.error(解析消息时发生异常: {}, e.getMessage());}}}OnClosepublic void onClose(Session session, CloseReason closeReason) {flagfalse;log.info(AllTick API Docs流已关闭,关闭原因:{},closeReason.toString());}OnErrorpublic void onError(Throwable e) {log.error(AllTick API Docs连接异常{},e.getMessage());}Asyncpublic void sendMessage(String key,String message) throws Exception {session.getBasicRemote().sendText(message);
// log.info(client:{}, AllTick API Docs 请求报文: {}, key, message);
// if (this.session ! null this.session.isOpen()) {
// this.session.getBasicRemote().sendText(message);
// } else {
// log.error(会话已关闭无法发送消息: {}, key);
// }}//websocket地址private String urlPropertiesUtil.getProperty(WebSocket.url);//token数据private String token PropertiesUtil.getProperty(WebSocket.token);public static ListWebSocketJavaExampleInfo webSocketJavaExampleList new ArrayList();PostConstructpublic void initPool() {new Thread(()-{ //另外起一个线程执行websocket不影响主线程run();}).start();}PreDestroypublic void destroy() {if (this.session ! null this.session.isOpen()) {try {this.session.close();} catch (IOException e) {log.error(关闭WebSocket连接时发生异常: {}, e.getMessage());}}}Asyncpublic void run(){try {ListStock list stockService.findStockList();int len list.size();int capacity (int) Math.ceil((double) len / MAXCAP);//向上取整
// int capacity (int) Math.ceil(len / MAXCAP);
// if (capacity1 || len % capacity ! 0 ) {
// capacity;
// }ListCodeVo codeVos new ArrayList();log.info(开始连接AllTick API Docs,请求url:{},url.concat(token));WebSocketContainer container ContainerProvider.getWebSocketContainer();URI uri new URI(url.concat(token)); // Replace with your websocket endpoint URLfor (int i 0; i capacity; i) {WebSocketJavaExample client new WebSocketJavaExample(); //多个客户client执行每个客户端执行1000条数据container.connectToServer(client, uri);ListStock list1 list.stream().skip(i * MAXCAP).limit(MAXCAP).collect(Collectors.toList());stockList new ArrayList();list1.forEach(item - {CodeVo codeVo new CodeVo();codeVo.setCode(item.getStockCode().concat(.US));
// codeVo.setCode(item.getStockCode());codeVos.add(codeVo);StockVo stockVo new StockVo();try {// 数据初始化String us RedisShardedPoolUtils.get(item.getStockGid(), 4);StockListVO stockListVO objectMapper.readValue(us, StockListVO.class);stockVo.setName(stockListVO.getName());stockVo.setCode(stockListVO.getCode());stockVo.setGid(stockListVO.getGid());stockVo.setStock_type(stockListVO.getStock_type());stockVo.setType(stockListVO.getType());stockVo.setHcrate(stockListVO.getHcrate());stockVo.setOpen_px(new BigDecimal(stockListVO.getOpen_px()));stockVo.setNowPrice(new BigDecimal(stockListVO.getNowPrice()));stockVo.setPreclose_px(new BigDecimal(stockListVO.getPreclose_px()));stockVo.setIsOption(Integer.valueOf(stockListVO.getIsOption()));stockList.add(stockVo);} catch (JsonProcessingException e) {log.info(redis数据转换对象stockListVO异常,e.getMessage());}});JSONArray symbolList new JSONArray(codeVos); // 直接将List转换为JSONArrayclient.setStockList(stockList);// 使用LinkedHashMap来保持顺序MapString, Object messageMap new LinkedHashMap();messageMap.put(cmd_id, 22004);messageMap.put(seq_id, 123);messageMap.put(trace, 3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806);MapString, Object dataMap new LinkedHashMap();dataMap.put(symbol_list, symbolList);messageMap.put(data, dataMap);// 将LinkedHashMap转换为JSONObjectJSONObject message2 new JSONObject(messageMap);String message message2.toString();
// String message {\cmd_id\:22004,\seq_id\:123,\trace\:\3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806\,\data\:{\symbol_list\: JSONUtil.toJsonStr(codeVos) }};client.sendMessage(client i, message);webSocketJavaExampleList.add(new WebSocketJavaExampleInfo(client, client i,message));codeVos.clear();// 创建一个TimerTask任务int finalI i;TimerTask task3 new TimerTask() {SneakyThrowsOverridepublic void run() {//定时获取心跳try {client.sendMessage(client finalI,{\n \cmd_id\:22000,\n \seq_id\:123,\n \trace\:\3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806\,\n \data\:{\n }\n });} catch (Exception e) {log.error(e.getMessage());}}};new Timer().schedule(task3, 10000,10000);new Thread().sleep(2000);}}catch (Exception e){e.printStackTrace();log.error(AllTick API Docs 连接失败:{},e.getMessage());}}
/**
定时任务应用启动后延迟 2.5 分钟开始执行之后每隔 2.5 分钟执行一次去勘测是否流关闭然后拯救连接websocket
**/Scheduled(fixedRate 1 * 15 * 10000,initialDelay 150000)public void Daemon() throws Exception {WebSocketContainer container ContainerProvider.getWebSocketContainer();URI uri new URI(url.concat(token)); // Replace with your websocket endpoint URLfor(WebSocketJavaExampleInfo webSocketJavaExampleInfo:webSocketJavaExampleList){if(!webSocketJavaExampleInfo.getClient().flag){container.connectToServer(webSocketJavaExampleInfo.getClient(), uri);webSocketJavaExampleInfo.getClient().sendMessage(webSocketJavaExampleInfo.getKey(), webSocketJavaExampleInfo.getMessage());}}}
}Data
class WebSocketJavaExampleInfo{private WebSocketJavaExample client;private String key;private String message;public WebSocketJavaExampleInfo(WebSocketJavaExample client, String key,String message) {this.client client;this.key key;this.message message;}
}4、创建websocket服务端用于连接客户端及供前端访问
package com.nq.common;import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.nq.vo.websocket.StockVo;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.config.annotation.EnableWebSocket;import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;import static com.nq.common.WebSocketJavaExample.MAXCAP;
import static com.nq.common.WebSocketJavaExample.webSocketJavaExampleList;ServerEndpoint(/ws/{userId})
EnableWebSocket
Component
Data
public class WebSocketServer {private static final Logger log LoggerFactory.getLogger(WebSocketServer.class);private static final Integer pageSize 100;//页码private Integer pageNo;//页数/*** concurrent包的线程安全Set用来存放每个客户端对应的MyWebSocket对象。*/private static ConcurrentHashMapString, WebSocketServer webSocketMap new ConcurrentHashMap();/*** 与某个客户端的连接会话需要通过它来给客户端发送数据*/private Session session;/*** 接收userId*/private String userId ;/*** 查询code*/private String code ;/*** 连接建立成功调用的方法*/OnOpenpublic void onOpen(Session session, PathParam(userId) String userId) {this.session session;this.userId userId;if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);webSocketMap.put(userId, this);} else {webSocketMap.put(userId, this);}}/*** 连接关闭调用的方法*/OnClosepublic void onClose() {if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);}}/*** 收到客户端消息后调用的方法** param message 客户端发送过来的消息*/OnMessagepublic void onMessage(String message, Session session) {log.info(用户消息: userId ,报文: message);JSONObject jsonObject JSONUtil.parseObj(message);
// pageSize jsonObject.getInt(pageSize);pageNo jsonObject.getInt(pageNo);code jsonObject.getStr(code);
// if (ObjectUtil.isNotEmpty(code)) { //如果code不为空则查询并推送数据
// queryAndSendStockVo(code, session);
// }}/*** param session* param error*/OnErrorpublic void onError(Session session, Throwable error) {log.error(用户错误: this.userId ,原因: error.getMessage());}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}/*** 群发消息** throws IOException*/PostConstructpublic void BroadCastInfo() throws IOException, InterruptedException {log.info(开始轮询批量推送数据);ThreadUtil.execAsync(() - {run();});}public void run() {WebSocketServer webSocketServer;ListStockVo list null;ListStockVo additionalList null;ListStockVo list2 null;while (true) {try {if (webSocketMap.size() 0) {for (Map.EntryString, WebSocketServer stringWebSocketServerEntry : webSocketMap.entrySet()) {webSocketServer stringWebSocketServerEntry.getValue();if (ObjectUtil.isEmpty(webSocketServer.pageNo) ObjectUtil.isNotEmpty(webSocketServer.pageSize) ObjectUtil.isEmpty(webSocketServer.getCode())) {//如果默认没有参数 就传输3千条
// if(ObjectUtil.isEmpty(webSocketServer.pageNo)) {//如果默认没有参数 就传输3千条list webSocketJavaExampleList.get(0).getClient().getStockList().stream().limit(1000).collect(Collectors.toList());} else if (ObjectUtil.isNotEmpty(webSocketServer.pageNo)) {int pageSize webSocketServer.pageSize;int pageNo webSocketServer.pageNo;Integer size pageNo * pageSize;
// int capacity (int) Math.ceil(size / 20);int capacity (int) Math.ceil(size / MAXCAP);int pageno (capacity * 1000 / pageSize);pageNo - pageno;if (capacity 0) {list webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(pageNo * pageSize).collect(Collectors.toList());}if (capacity 1) {list webSocketJavaExampleList.get(0).getClient().getStockList().stream().collect(Collectors.toList());additionalList webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(size - (MAXCAP * capacity)).collect(Collectors.toList());list Stream.concat(list.stream(), additionalList.stream()).collect(Collectors.toList());}if (capacity 2) {list webSocketJavaExampleList.get(0).getClient().getStockList().stream().collect(Collectors.toList());list2 webSocketJavaExampleList.get(1).getClient().getStockList().stream().collect(Collectors.toList());additionalList webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(size - (MAXCAP * capacity)).collect(Collectors.toList());list Stream.concat(Stream.concat(list.stream(), list2.stream()), additionalList.stream()).collect(Collectors.toList());}
// list webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip((pageNo - 1) * pageSize).limit(pageSize).collect(Collectors.toList());} else {String queryCode webSocketServer.getCode();// 使用并行流处理数据减少嵌套循环list webSocketJavaExampleList.parallelStream().flatMap(webSocketJavaExampleInfo - webSocketJavaExampleInfo.getClient().getStockList().stream()).filter(stockVo - stockVo.getCode().contains(queryCode)).collect(Collectors.toList());}try {stringWebSocketServerEntry.getValue().sendMessage(JSONUtil.toJsonStr(list));} catch (IOException e) {log.error(用户编码为:{},推送ws数据异常,异常原因{}, webSocketServer.getUserId(), e.getMessage());}}}} catch (Exception e) {log.error(推送失败: {}, e.getMessage());} finally {try {new Thread().sleep(2000);} catch (InterruptedException e) {log.error(没有客户端{},webSocketMap:{}, e.getMessage(), webSocketMap.size());}}}}}以上是基于小编在开发过程中针对websocket技术的实战经验通过调用第三方wss的的数据来获取实时数据