diff --git a/src/msg_gateway/gate/logic.go b/src/msg_gateway/gate/logic.go index f577439cc..a79fd25c6 100644 --- a/src/msg_gateway/gate/logic.go +++ b/src/msg_gateway/gate/logic.go @@ -4,12 +4,12 @@ import ( "Open_IM/src/common/config" "Open_IM/src/common/constant" "Open_IM/src/common/log" + "Open_IM/src/grpc-etcdv3/getcdv3" pbChat "Open_IM/src/proto/chat" "Open_IM/src/utils" "context" "encoding/json" "github.com/gorilla/websocket" - "github.com/skiffer-git/grpc-etcdv3/getcdv3" "strings" ) diff --git a/src/msg_gateway/gate/rpc_server.go b/src/msg_gateway/gate/rpc_server.go index 35f7938af..e9aa28033 100644 --- a/src/msg_gateway/gate/rpc_server.go +++ b/src/msg_gateway/gate/rpc_server.go @@ -4,13 +4,13 @@ import ( "Open_IM/src/common/config" "Open_IM/src/common/constant" "Open_IM/src/common/log" + "Open_IM/src/grpc-etcdv3/getcdv3" pbRelay "Open_IM/src/proto/relay" "Open_IM/src/utils" "context" "encoding/json" "fmt" "github.com/gorilla/websocket" - "github.com/skiffer-git/grpc-etcdv3/getcdv3" "google.golang.org/grpc" "net" "strings" diff --git a/src/msg_gateway/gate/ws_server.go b/src/msg_gateway/gate/ws_server.go index a62007fc7..ffc14680d 100644 --- a/src/msg_gateway/gate/ws_server.go +++ b/src/msg_gateway/gate/ws_server.go @@ -80,23 +80,40 @@ func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error { func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) { rwLock.Lock() defer rwLock.Unlock() + if oldConn, ok := ws.wsUserToConn[uid]; ok { + err := oldConn.Close() + delete(ws.wsConnToUser, oldConn) + if err != nil { + log.ErrorByKv("close err", "", "uid", uid, "conn", conn) + } + } else { + log.InfoByKv("this user is first login", "", "uid", uid) + } ws.wsConnToUser[conn] = uid ws.wsUserToConn[uid] = conn - log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid) + log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid, "online_num", len(ws.wsUserToConn)) } func (ws *WServer) delUserConn(conn *websocket.Conn) { rwLock.Lock() defer rwLock.Unlock() + var uidPlatform string if uid, ok := ws.wsConnToUser[conn]; ok { + uidPlatform = uid if _, ok = ws.wsUserToConn[uid]; ok { delete(ws.wsUserToConn, uid) - log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "uid", uid) + log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "uid", uid, "online_num", len(ws.wsUserToConn)) + } else { + log.WarnByKv("uid not exist", "", "wsUser deleted", ws.wsUserToConn, "uid", uid, "online_num", len(ws.wsUserToConn)) } delete(ws.wsConnToUser, conn) } - conn.Close() + err := conn.Close() + if err != nil { + log.ErrorByKv("close err", "", "uid", uidPlatform, "conn", conn) + } + } func (ws *WServer) getUserConn(uid string) *websocket.Conn { diff --git a/src/msg_transfer/logic/history_msg_handler.go b/src/msg_transfer/logic/history_msg_handler.go index cdf8f7721..9a585eca2 100644 --- a/src/msg_transfer/logic/history_msg_handler.go +++ b/src/msg_transfer/logic/history_msg_handler.go @@ -5,14 +5,13 @@ import ( "Open_IM/src/common/constant" kfk "Open_IM/src/common/kafka" "Open_IM/src/common/log" + "Open_IM/src/grpc-etcdv3/getcdv3" pbMsg "Open_IM/src/proto/chat" pbPush "Open_IM/src/proto/push" - "Open_IM/src/push/content_struct" "Open_IM/src/utils" "context" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" - "github.com/skiffer-git/grpc-etcdv3/getcdv3" "strings" ) @@ -87,17 +86,6 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) } else if pbData.SessionType == constant.GroupChatType { log.Info("", "", "msg_transfer chat type = GroupChatType") uidAndGroupID := strings.Split(pbData.RecvID, " ") - if pbData.ContentType == constant.AtText { - atContent := content_struct.AtTextContent{ - Text: pbData.Content, - AtUserList: pbData.ForceList, - } - if utils.IsContain(uidAndGroupID[0], pbData.ForceList) { - atContent.IsAtSelf = true - } - pbSaveData.Content = utils.StructToJsonString(atContent) - } - saveUserChat(uidAndGroupID[0], &pbSaveData) pbSaveData.Options = pbData.Options pbSaveData.OfflineInfo = pbData.OfflineInfo @@ -148,7 +136,6 @@ func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) { } msgClient := pbPush.NewPushMsgServiceClient(grpcConn) _, err := msgClient.PushMsg(context.Background(), &msg) - defer grpcConn.Close() if err != nil { log.ErrorByKv("rpc send failed", msg.OperationID, "push data", msg.String(), "err", err.Error()) pid, offset, err := producer.SendMessage(message) diff --git a/src/push/content_struct/content.go b/src/push/content_struct/content.go index 8b311686a..6821c6a52 100644 --- a/src/push/content_struct/content.go +++ b/src/push/content_struct/content.go @@ -57,6 +57,18 @@ type CreateGroupSysMsg struct { Text string `json:"text"` } +type NotificationContent struct { + IsDisplay int32 `json:"isDisplay"` + DefaultTips string `json:"defaultTips"` + Detail string `json:"detail"` +} +type KickGroupMemberApiReq struct { + GroupID string `json:"groupID"` + UidList []string `json:"uidList"` + Reason string `json:"reason"` + OperationID string `json:"operationID"` +} + func NewCreateGroupSysMsgString(create *CreateGroupSysMsg, text string) string { create.Text = text jstring, _ := json.Marshal(create) diff --git a/src/push/logic/push_rpc_server.go b/src/push/logic/push_rpc_server.go index c856e56e5..ed128790c 100644 --- a/src/push/logic/push_rpc_server.go +++ b/src/push/logic/push_rpc_server.go @@ -3,11 +3,11 @@ package logic import ( "Open_IM/src/common/config" "Open_IM/src/common/log" + "Open_IM/src/grpc-etcdv3/getcdv3" "Open_IM/src/proto/push" pbRelay "Open_IM/src/proto/relay" "Open_IM/src/utils" "context" - "github.com/skiffer-git/grpc-etcdv3/getcdv3" "google.golang.org/grpc" "net" "strings" diff --git a/src/push/logic/push_to_client.go b/src/push/logic/push_to_client.go index 11072f184..c52ff0a61 100644 --- a/src/push/logic/push_to_client.go +++ b/src/push/logic/push_to_client.go @@ -10,6 +10,7 @@ import ( "Open_IM/src/common/config" "Open_IM/src/common/constant" "Open_IM/src/common/log" + "Open_IM/src/grpc-etcdv3/getcdv3" pbChat "Open_IM/src/proto/chat" pbGroup "Open_IM/src/proto/group" pbRelay "Open_IM/src/proto/relay" @@ -20,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/skiffer-git/grpc-etcdv3/getcdv3" "strings" )