wkim/lib/manager/connect_manager_web.dart
2025-01-11 14:06:09 +08:00

627 lines
19 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'package:connectivity/connectivity.dart';
import 'package:flutter/foundation.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:uuid/uuid.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:wukongimfluttersdk/db/const.dart';
import 'package:wukongimfluttersdk/db/wk_db_helper.dart';
import 'package:wukongimfluttersdk/entity/channel.dart';
import 'package:wukongimfluttersdk/entity/channel_member.dart';
import 'package:wukongimfluttersdk/entity/msg.dart';
import 'package:wukongimfluttersdk/manager/my_wk_socket_base.dart';
import 'package:wukongimfluttersdk/manager/my_wk_socket_channel.dart';
import 'package:wukongimfluttersdk/proto/write_read.dart';
import 'package:wukongimfluttersdk/wkim.dart';
import 'package:wukongimfluttersdk/common/crypto_utils.dart';
import '../common/logs.dart';
import '../entity/conversation.dart';
import '../proto/packet.dart';
import '../proto/proto.dart';
import '../type/const.dart';
class WKWebConnectionManager {
WKWebConnectionManager._privateConstructor();
static final WKWebConnectionManager _instance =
WKWebConnectionManager._privateConstructor();
static WKWebConnectionManager get shared => _instance;
/// 是否登陆
bool _isLogout = false;
/// 是否断线重连 - 网络异常使用
bool _isReconnection = false;
Timer? socketConnectioningTimer;
/// socket是否链接 - socket没有链接成功之前不会有心跳检测
bool _isSocketConnectioning = false;
/// 链接断开后几秒后重连
final int reconnMilliseconds = 5000;
/// 心跳计时器
Timer? heartTimer;
/// 心跳计时器 - 间隔时间
final heartIntervalSecond = const Duration(seconds: 30);
/// 网络状态计时器
Timer? checkNetworkTimer;
/// 网络状态计时器 - 间隔时间
final checkNetworkSecond = const Duration(seconds: 1);
/// 发送中消息集合
final LinkedHashMap<int, SendingMsg> _sendingMsgMap = LinkedHashMap();
/// 链接状态监听集合
HashMap<String, Function(int, int?, ConnectionInfo?)>? _connectionListenerMap;
/// Socket封装
MyWkSocketBase? _socket;
/// 添加链接状态监听 - 对外开放接口
addOnConnectionStatus(String key, Function(int, int?, ConnectionInfo?) back) {
_connectionListenerMap ??= HashMap();
_connectionListenerMap![key] = back;
}
/// 移除链接状态监听 - 对外开放接口
removeOnConnectionStatus(String key) {
if (_connectionListenerMap != null) {
_connectionListenerMap!.remove(key);
}
}
/// 设置链接状态
setConnectionStatus(int status, {int? reasoncode, ConnectionInfo? info}) {
if (_connectionListenerMap != null) {
_connectionListenerMap!.forEach((key, back) {
back(status, reasoncode, info);
});
}
}
/// 链接 - socket
connect() {
var addr = WKIM.shared.options.addr;
if ((addr == null || addr == "") && WKIM.shared.options.getAddr == null) {
Logs.info("没有配置addr");
return;
}
if (WKIM.shared.options.uid == "" ||
WKIM.shared.options.uid == null ||
WKIM.shared.options.token == "" ||
WKIM.shared.options.token == null) {
Logs.error("没有初始化uid或token");
return;
}
_isLogout = false;
disconnect(_isLogout);
if (WKIM.shared.options.getAddr != null) {
WKIM.shared.options.getAddr!((String addr) {
_socketConnect(addr);
});
} else {
_socketConnect(addr!);
}
}
/// 断开 - socket
disconnect(bool isLogout) {
_isLogout = true;
if (_socket != null) {
_socket!.close();
}
if (isLogout) {
WKIM.shared.options.uid = '';
WKIM.shared.options.token = '';
WKIM.shared.messageManager.updateSendingMsgFail();
/// 需要清理掉 _sendingMsgMap
_sendingMsgMap.clear();
WKDBHelper.shared.close();
}
_closeAll();
WKIM.shared.connectionManager.setConnectionStatus(WKConnectStatus.fail);
}
/// 真正 - 链接 - Socket
_socketConnect(String addr) {
try {
_isSocketConnectioning = true;
setConnectionStatus(WKConnectStatus.connecting);
var channl = WebSocketChannel.connect(Uri.parse(addr));
_socket = MyWkSocketChannel.newSocket(channl);
_connectSuccess();
} catch (e) {
_connectFail(e);
Logs.error("websocket connect error, ${e.toString()}");
}
}
/// socket 链接成功后 socket 添加监听
_connectSuccess() {
// 监听消息
_socket?.listen(onData: (Uint8List data) {
_cutDatas(data);
}, onOpen: () {
// 发送连接包
_sendConnectPacket();
}, error: () {
if (_isLogout && !_isSocketConnectioning) {
Logs.debug("登出了");
return;
}
_startSocketConnectioningTimer();
});
_sendConnectPacket();
}
_startSocketConnectioningTimer() {
_stopSocketConnectioningTimer();
socketConnectioningTimer =
Timer(Duration(milliseconds: reconnMilliseconds), () {
_stopSocketConnectioningTimer();
connect();
});
}
_stopSocketConnectioningTimer() {
if (socketConnectioningTimer != null) {
socketConnectioningTimer!.cancel();
socketConnectioningTimer = null;
}
}
/// socket 链接失败 - 重新链接
_connectFail(error) {
_startSocketConnectioningTimer();
}
/// 测试 - CutData
testCutData(Uint8List data) {
_cutDatas(data);
}
/// 分包数据组装
Uint8List? _cacheData;
/// 接收分包数据
_cutDatas(Uint8List data) {
if (_cacheData == null || _cacheData!.isEmpty) {
_cacheData = data;
} else {
// 上次存在未解析完的消息
Uint8List temp = Uint8List(_cacheData!.length + data.length);
for (var i = 0; i < _cacheData!.length; i++) {
temp[i] = _cacheData![i];
}
for (var i = 0; i < data.length; i++) {
temp[i + _cacheData!.length] = data[i];
}
_cacheData = temp;
}
Uint8List lastMsgBytes = _cacheData!;
int readLength = 0;
while (lastMsgBytes.isNotEmpty && readLength != lastMsgBytes.length) {
readLength = lastMsgBytes.length;
ReadData readData = ReadData(lastMsgBytes);
var b = readData.readUint8();
var packetType = b >> 4;
if (PacketType.values[(b >> 4)] == PacketType.pong) {
Logs.debug('pong');
Uint8List bytes = lastMsgBytes.sublist(1, lastMsgBytes.length);
_cacheData = lastMsgBytes = bytes;
} else {
if (packetType < 10) {
if (lastMsgBytes.length < 5) {
_cacheData = lastMsgBytes;
break;
}
int remainingLength = readData.readVariableLength();
if (remainingLength == -1) {
//剩余长度被分包
_cacheData = lastMsgBytes;
break;
}
if (remainingLength > 1 << 21) {
_cacheData = null;
break;
}
List<int> bytes = encodeVariableLength(remainingLength);
if (remainingLength + 1 + bytes.length > lastMsgBytes.length) {
//半包情况
_cacheData = lastMsgBytes;
} else {
Uint8List msg =
lastMsgBytes.sublist(0, remainingLength + 1 + bytes.length);
_decodePacket(msg);
Uint8List temps =
lastMsgBytes.sublist(msg.length, lastMsgBytes.length);
_cacheData = lastMsgBytes = temps;
}
} else {
_cacheData = null;
// 数据包错误,重连
connect();
break;
}
}
}
}
/// 解码出包
_decodePacket(Uint8List data) {
var packet = WKIM.shared.options.proto.decode(data);
Logs.debug('解码出包->$packet');
if (packet.header.packetType == PacketType.connack) {
var connackPacket = packet as ConnackPacket;
if (connackPacket.reasonCode == 1) {
Logs.debug('连接成功!');
WKIM.shared.options.protoVersion = connackPacket.serviceProtoVersion;
CryptoUtils.setServerKeyAndSalt(
connackPacket.serverKey, connackPacket.salt);
setConnectionStatus(WKConnectStatus.success,
reasoncode: connackPacket.reasonCode,
info: ConnectionInfo(connackPacket.nodeId));
try {
WKIM.shared.conversationManager.setSyncConversation(() {
setConnectionStatus(WKConnectStatus.syncCompleted);
_resendMsg();
});
} catch (e) {
Logs.error(e.toString());
}
_isSocketConnectioning = false;
_startHeartTimer();
_startCheckNetworkTimer();
} else {
setConnectionStatus(WKConnectStatus.fail,
reasoncode: connackPacket.reasonCode);
Logs.debug('连接失败!错误->${connackPacket.reasonCode}');
}
} else if (packet.header.packetType == PacketType.recv) {
Logs.debug('收到消息');
var recvPacket = packet as RecvPacket;
_verifyRecvMsg(recvPacket);
if (!recvPacket.header.noPersist) {
_sendReceAckPacket(
recvPacket.messageID, recvPacket.messageSeq, recvPacket.header);
}
} else if (packet.header.packetType == PacketType.sendack) {
var sendack = packet as SendAckPacket;
WKIM.shared.messageManager.updateSendResult(sendack.messageID,
sendack.clientSeq, sendack.messageSeq, sendack.reasonCode);
if (_sendingMsgMap.containsKey(sendack.clientSeq)) {
_sendingMsgMap[sendack.clientSeq]!.isCanResend = false;
}
} else if (packet.header.packetType == PacketType.disconnect) {
disconnect(true);
setConnectionStatus(WKConnectStatus.kicked);
} else if (packet.header.packetType == PacketType.pong) {
Logs.info('pong...');
}
}
/// 清理 socket 相关
_closeAll() {
_isReconnection = false;
_isSocketConnectioning = false;
_stopSocketConnectioningTimer();
_stopCheckNetworkTimer();
_stopHeartTimer();
if (_socket != null) {
_socket!.close();
_socket = null;
}
}
/// 发送接收确认包
_sendReceAckPacket(BigInt messageID, int messageSeq, PacketHeader header) {
RecvAckPacket ackPacket = RecvAckPacket();
ackPacket.header.noPersist = header.noPersist;
ackPacket.header.syncOnce = header.syncOnce;
ackPacket.header.showUnread = header.showUnread;
ackPacket.messageID = messageID;
ackPacket.messageSeq = messageSeq;
_sendPacket(ackPacket);
}
/// 链接成功后发送数据包
_sendConnectPacket() async {
CryptoUtils.init();
var deviceID = await _getDeviceID();
var connectPacket = ConnectPacket(
uid: WKIM.shared.options.uid!,
token: WKIM.shared.options.token!,
version: WKIM.shared.options.protoVersion,
clientKey: base64Encode(CryptoUtils.dhPublicKey!),
deviceID: deviceID,
clientTimestamp: DateTime.now().millisecondsSinceEpoch);
connectPacket.deviceFlag = WKIM.shared.deviceFlagApp;
_sendPacket(connectPacket);
}
/// 发送数据包
_sendPacket(Packet packet) async {
var data = WKIM.shared.options.proto.encode(packet);
if (!_isReconnection) {
await _socket?.send(data);
}
}
/// 添加查看网络状态
_startCheckNetworkTimer() {
_stopCheckNetworkTimer();
checkNetworkTimer = Timer.periodic(checkNetworkSecond, (timer) {
Future<ConnectivityResult> connectivityResult =
(Connectivity().checkConnectivity());
connectivityResult.then((value) {
if (value == ConnectivityResult.none) {
_isReconnection = true;
Logs.debug('网络断开了');
_checkSedingMsg();
setConnectionStatus(WKConnectStatus.noNetwork);
} else {
if (_isReconnection) {
_isReconnection = false;
connect();
}
}
});
});
}
/// 停止查看网络状态
_stopCheckNetworkTimer() {
if (checkNetworkTimer != null) {
checkNetworkTimer!.cancel();
checkNetworkTimer = null;
}
}
/// 添加心跳检测
_startHeartTimer() {
_stopHeartTimer();
heartTimer = Timer.periodic(heartIntervalSecond, (timer) {
Logs.info('ping...');
_sendPacket(PingPacket());
});
}
/// 停止心跳检测
_stopHeartTimer() {
if (heartTimer != null) {
heartTimer!.cancel();
heartTimer = null;
}
}
/// 发送消息
sendMessage(WKMsg wkMsg) {
SendPacket packet = SendPacket();
packet.setting = wkMsg.setting;
packet.header.noPersist = wkMsg.header.noPersist;
packet.header.showUnread = wkMsg.header.redDot;
packet.header.syncOnce = wkMsg.header.syncOnce;
packet.channelID = wkMsg.channelID;
packet.channelType = wkMsg.channelType;
packet.clientSeq = wkMsg.clientSeq;
packet.clientMsgNO = wkMsg.clientMsgNO;
packet.topic = wkMsg.topicID;
packet.expire = wkMsg.expireTime;
packet.payload = wkMsg.content;
_addSendingMsg(packet);
_sendPacket(packet);
}
/// 验证接收到的消息
_verifyRecvMsg(RecvPacket recvMsg) {
StringBuffer sb = StringBuffer();
sb.writeAll([
recvMsg.messageID,
recvMsg.messageSeq,
recvMsg.clientMsgNO,
recvMsg.messageTime,
recvMsg.fromUID,
recvMsg.channelID,
recvMsg.channelType,
recvMsg.payload
]);
var encryptContent = sb.toString();
var result = CryptoUtils.aesEncrypt(encryptContent);
String localMsgKey = CryptoUtils.generateMD5(result);
if (recvMsg.msgKey != localMsgKey) {
Logs.error('非法消息-->期望msgKey$localMsgKey实际msgKey${recvMsg.msgKey}');
return;
} else {
recvMsg.payload = CryptoUtils.aesDecrypt(recvMsg.payload);
Logs.debug(recvMsg.toString());
_saveRecvMsg(recvMsg);
}
}
/// 保存接受到的消息
_saveRecvMsg(RecvPacket recvMsg) async {
WKMsg msg = WKMsg();
msg.header.redDot = recvMsg.header.showUnread;
msg.header.noPersist = recvMsg.header.noPersist;
msg.header.syncOnce = recvMsg.header.syncOnce;
msg.setting = recvMsg.setting;
msg.channelType = recvMsg.channelType;
msg.channelID = recvMsg.channelID;
msg.content = recvMsg.payload;
msg.messageID = recvMsg.messageID.toString();
msg.messageSeq = recvMsg.messageSeq;
msg.timestamp = recvMsg.messageTime;
msg.fromUID = recvMsg.fromUID;
msg.clientMsgNO = recvMsg.clientMsgNO;
msg.expireTime = recvMsg.expire;
if (msg.expireTime > 0) {
msg.expireTimestamp = msg.expireTime + msg.timestamp;
}
msg.status = WKSendMsgResult.sendSuccess;
msg.topicID = recvMsg.topic;
msg.orderSeq = await WKIM.shared.messageManager
.getMessageOrderSeq(msg.messageSeq, msg.channelID, msg.channelType);
dynamic contentJson = jsonDecode(msg.content);
msg.contentType = WKDBConst.readInt(contentJson, 'type');
msg.isDeleted = _isDeletedMsg(contentJson);
msg.messageContent = WKIM.shared.messageManager
.getMessageModel(msg.contentType, contentJson);
WKChannel? fromChannel = await WKIM.shared.channelManager
.getChannel(msg.fromUID, WKChannelType.personal);
if (fromChannel != null) {
msg.setFrom(fromChannel);
}
if (msg.channelType == WKChannelType.group) {
WKChannelMember? memberChannel = await WKIM.shared.channelMemberManager
.getMember(msg.channelID, WKChannelType.group, msg.fromUID);
if (memberChannel != null) {
msg.setMemberOfFrom(memberChannel);
}
}
WKIM.shared.messageManager.parsingMsg(msg);
if (msg.isDeleted == 0 &&
!msg.header.noPersist &&
msg.contentType != WkMessageContentType.insideMsg) {
int row = await WKIM.shared.messageManager.saveMsg(msg);
msg.clientSeq = row;
WKUIConversationMsg? uiMsg = await WKIM.shared.conversationManager
.saveWithLiMMsg(msg, msg.header.redDot ? 1 : 0);
if (uiMsg != null) {
List<WKUIConversationMsg> list = [];
list.add(uiMsg);
WKIM.shared.conversationManager.setRefreshUIMsgs(list);
}
} else {
Logs.debug(
'消息不能存库:is_deleted=${msg.isDeleted},no_persist=${msg.header.noPersist},content_type:${msg.contentType}');
}
if (msg.contentType != WkMessageContentType.insideMsg) {
List<WKMsg> list = [];
list.add(msg);
WKIM.shared.messageManager.pushNewMsg(list);
}
}
/// 是否删除消息
int _isDeletedMsg(dynamic jsonObject) {
int isDelete = 0;
if (jsonObject != null) {
var visibles = jsonObject['visibles'];
if (visibles != null && visibles is List) {
bool isIncludeLoginUser = false;
for (int i = 0, size = visibles.length; i < size; i++) {
if (visibles[i] == WKIM.shared.options.uid) {
isIncludeLoginUser = true;
break;
}
}
isDelete = isIncludeLoginUser ? 0 : 1;
}
}
return isDelete;
}
/// 重发消息
_resendMsg() async {
_removeSendingMsg();
if (_sendingMsgMap.isNotEmpty) {
for (var entry in _sendingMsgMap.entries) {
if (entry.value.isCanResend) {
Logs.debug("重发消息:${entry.value.sendPacket.clientSeq}");
await _sendPacket(entry.value.sendPacket);
}
}
}
}
/// 添加发送中消息
_addSendingMsg(SendPacket sendPacket) {
_removeSendingMsg();
_sendingMsgMap[sendPacket.clientSeq] = SendingMsg(sendPacket);
}
/// 删除发送中消息
_removeSendingMsg() {
if (_sendingMsgMap.isNotEmpty) {
List<int> ids = [];
_sendingMsgMap.forEach((key, sendingMsg) {
if (!sendingMsg.isCanResend) {
ids.add(key);
}
});
if (ids.isNotEmpty) {
for (var i = 0; i < ids.length; i++) {
_sendingMsgMap.remove(ids[i]);
}
}
}
}
/// 查看正在发送中的消息 - 未发送成功 - 尝试重新发送 - 或者设置发送失败
_checkSedingMsg() {
if (_sendingMsgMap.isNotEmpty) {
final it = _sendingMsgMap.entries.iterator;
while (it.moveNext()) {
var key = it.current.key;
var wkSendingMsg = it.current.value;
if (wkSendingMsg.sendCount == 5 && wkSendingMsg.isCanResend) {
WKIM.shared.messageManager.updateMsgStatusFail(key);
wkSendingMsg.isCanResend = false;
} else {
var nowTime =
(DateTime.now().millisecondsSinceEpoch / 1000).truncate();
if (nowTime - wkSendingMsg.sendTime > 10) {
wkSendingMsg.sendTime =
(DateTime.now().millisecondsSinceEpoch / 1000).truncate();
wkSendingMsg.sendCount++;
_sendingMsgMap[key] = wkSendingMsg;
_sendPacket(wkSendingMsg.sendPacket);
Logs.debug("消息发送失败,尝试重发中...");
}
}
}
_removeSendingMsg();
}
}
}
/// 获取设备ID
Future<String> _getDeviceID() async {
SharedPreferences preferences = await SharedPreferences.getInstance();
String wkUid = WKIM.shared.options.uid!;
String key = "${wkUid}_device_id";
var deviceID = preferences.getString(key);
if (deviceID == null || deviceID == "") {
deviceID = const Uuid().v4().toString().replaceAll("-", "");
preferences.setString(key, deviceID);
}
return "${deviceID}F";
}
/// 发送消息对象
class SendingMsg {
SendPacket sendPacket;
int sendCount = 0;
int sendTime = 0;
bool isCanResend = true;
SendingMsg(this.sendPacket) {
sendTime = (DateTime.now().millisecondsSinceEpoch / 1000).truncate();
}
}
/// 监听链接对象返回对象
class ConnectionInfo {
int nodeId;
ConnectionInfo(this.nodeId);
}