diff --git a/internal/msg_gateway/gate/callback.go b/internal/msg_gateway/gate/callback.go index b2f15035e..139d89afc 100644 --- a/internal/msg_gateway/gate/callback.go +++ b/internal/msg_gateway/gate/callback.go @@ -9,7 +9,7 @@ import ( "time" ) -func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackgroundStatusChanged bool) cbApi.CommonCallbackResp { +func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackgroundStatusChanged bool, connID string) cbApi.CommonCallbackResp { callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} if !config.Config.Callback.CallbackUserOnline.Enable { return callbackResp @@ -27,6 +27,7 @@ func callbackUserOnline(operationID, userID string, platformID int, token string }, Seq: int(time.Now().UnixNano() / 1e6), IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged, + ConnID: connID, } callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: &callbackResp} if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOnlineCommand, callbackUserOnlineReq, callbackUserOnlineResp, config.Config.Callback.CallbackUserOnline.CallbackTimeOut); err != nil { @@ -36,7 +37,7 @@ func callbackUserOnline(operationID, userID string, platformID int, token string return callbackResp } -func callbackUserOffline(operationID, userID string, platformID int, isAppBackgroundStatusChanged bool) cbApi.CommonCallbackResp { +func callbackUserOffline(operationID, userID string, platformID int, isAppBackgroundStatusChanged bool, connID string) cbApi.CommonCallbackResp { callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} if !config.Config.Callback.CallbackUserOffline.Enable { return callbackResp @@ -53,6 +54,7 @@ func callbackUserOffline(operationID, userID string, platformID int, isAppBackgr }, Seq: int(time.Now().UnixNano() / 1e6), IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged, + ConnID: connID, } callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: &callbackResp} if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOfflineCommand, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil { diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index edf1c0c56..4c8283b10 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -407,12 +407,12 @@ func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) { req := pData.(*sdk_ws.SetAppBackgroundStatusReq) conn.IsBackground = req.IsBackground if !conn.IsBackground { - callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, true) + callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, true, conn.connID) if callbackResp.ErrCode != 0 { log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) } } else { - callbackResp := callbackUserOffline(m.OperationID, conn.userID, int(conn.PlatformID), true) + callbackResp := callbackUserOffline(m.OperationID, conn.userID, int(conn.PlatformID), true, conn.connID) if callbackResp.ErrCode != 0 { log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) } diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index aa68a8e75..68bfa63c4 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -15,6 +15,7 @@ import ( "context" "encoding/gob" "io/ioutil" + "strconv" "strings" go_redis "github.com/go-redis/redis/v8" @@ -37,6 +38,7 @@ type UserConn struct { userID string IsBackground bool token string + connID string } type WServer struct { @@ -82,9 +84,9 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { log.Error(operationID, "upgrade http conn err", err.Error(), query) return } else { - newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0]} + newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0], conn.RemoteAddr().String() + strconv.Itoa(int(utils.GetCurrentTimestampBySecond()))} userCount++ - ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID) + ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], newConn.connID, operationID) go ws.readMsg(newConn) } } else { @@ -319,11 +321,11 @@ func (ws *WServer) sendKickMsg(oldConn *UserConn) { } } -func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) { +func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) { rwLock.Lock() defer rwLock.Unlock() log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String()) - callbackResp := callbackUserOnline(operationID, uid, platformID, token, false) + callbackResp := callbackUserOnline(operationID, uid, platformID, token, false, connID) if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp) } @@ -386,7 +388,7 @@ func (ws *WServer) delUserConn(conn *UserConn) { if err != nil { log.Error(operationID, " close err", "", "uid", uid, "platform", platform) } - callbackResp := callbackUserOffline(operationID, conn.userID, platform, false) + callbackResp := callbackUserOffline(operationID, conn.userID, platform, false, conn.connID) if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) } diff --git a/pkg/call_back_struct/msg_gateway.go b/pkg/call_back_struct/msg_gateway.go index 87d07c19f..235f63105 100644 --- a/pkg/call_back_struct/msg_gateway.go +++ b/pkg/call_back_struct/msg_gateway.go @@ -5,6 +5,7 @@ type CallbackUserOnlineReq struct { Token string `json:"token"` Seq int `json:"seq"` IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"` + ConnID string `json:"connID"` } type CallbackUserOnlineResp struct { @@ -13,8 +14,9 @@ type CallbackUserOnlineResp struct { type CallbackUserOfflineReq struct { UserStatusCallbackReq - Seq int `json:"seq"` - IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"` + Seq int `json:"seq"` + IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"` + ConnID string `json:"connID"` } type CallbackUserOfflineResp struct { diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index 6c9583839..7ee7c0650 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -25,7 +25,7 @@ type ExtendMsgSet struct { } type ReactionExtendMsgSet struct { - TypeKey string `bson:"type_key" json:"typeKey"` + UserKey string `bson:"user_key" json:"userKey"` Value string `bson:"value" json:"value"` } @@ -82,13 +82,20 @@ func (d *DataBases) InsertExtendMsg(ID string, index int32, msg *ExtendMsg) (msg return set.ExtendMsgNum, err } -func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, msg *ExtendMsg, msgSet *ExtendMsgSet) error { +func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, userIndex string, value string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{}) return err } +func (d *DataBases) DelOneExtendMsgSetUserKey(ID string, index, MsgIndex int32, userIndex string, userID string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + _, err := c.DeleteOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}) + return err +} + func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)