From 5c5f80691a480cc52070ee8affb5cddaf5f7ae11 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 11 Aug 2022 12:09:44 +0800 Subject: [PATCH 1/2] fix delete --- internal/cron_task/clear_msg.go | 12 +++++++++++- internal/cron_task/cron_task.go | 4 +++- pkg/common/db/mongoModel.go | 10 ++++++++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 3c43e3e20..078eee136 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -7,6 +7,7 @@ import ( "Open_IM/pkg/common/log" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" + goRedis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" "math" ) @@ -21,10 +22,13 @@ func ResetUserGroupMinSeq(operationID, groupID string, userIDList []string) erro log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMongoMsg failed") return utils.Wrap(err, "") } + if minSeq == 0 { + return nil + } log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delMsgIDList, "minSeq", minSeq) for _, userID := range userIDList { userMinSeq, err := db.DB.GetGroupUserMinSeq(groupID, userID) - if err != nil { + if err != nil && err != goRedis.Nil { log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) continue } @@ -46,6 +50,9 @@ func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { if err != nil { return utils.Wrap(err, "") } + if minSeq == 0 { + return nil + } log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", delMsgIDList, "minSeq", minSeq) err = db.DB.SetUserMinSeq(userID, minSeq) return err @@ -136,6 +143,9 @@ func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { maxSeq, err = db.DB.GetGroupMaxSeq(ID) } if err != nil { + if err == goRedis.Nil { + return nil + } return utils.Wrap(err, "GetUserMaxSeq failed") } msg, err := db.DB.GetNewestMsg(ID) diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 0b1784b20..a7f90cf91 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -20,7 +20,7 @@ func StartCronTask() { fmt.Println("config", config.Config.Mongo.ChatRecordsClearTime) _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, func() { operationID := getCronTaskOperationID() - log.NewInfo(operationID, "start", utils.GetSelfFuncName()) + log.NewInfo(operationID, "====================== start del cron task ======================") userIDList, err := im_mysql_model.SelectAllUserID() if err == nil { log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) @@ -56,6 +56,8 @@ func StartCronTask() { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) return } + + log.NewInfo(operationID, "====================== start del cron finished ======================") }) if err != nil { fmt.Println("start cron failed", err.Error()) diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 47b556b36..4953aaabc 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -263,13 +263,16 @@ func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, er ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) - findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"$regex": regex}).SetSort(bson.M{"uid": 1}) + findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"uid": 1}) var msgs []UserChat cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts) if err != nil { return nil, err } err = cursor.Decode(&msgs) + if err != nil { + return nil, utils.Wrap(err, "") + } if len(msgs) > 0 { return &msgs[0], err } else { @@ -306,13 +309,16 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) - findOpts := options.Find().SetLimit(1).SetSort(bson.M{"$regex": regex}).SetSort(bson.M{"uid": -1}) + findOpts := options.Find().SetLimit(1).SetSort(bson.M{"uid": -1}) var userChats []UserChat cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts) if err != nil { return nil, err } err = cursor.Decode(&userChats) + if err != nil { + return nil, utils.Wrap(err, "") + } if len(userChats) > 0 { if len(userChats[0].Msg) > 0 { return &userChats[0].Msg[len(userChats[0].Msg)], nil From c4084cbcec7c9d1b856de3156029d6f5c0f305e3 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 11 Aug 2022 15:55:33 +0800 Subject: [PATCH 2/2] callback kickoff --- config/config.yaml | 3 +++ internal/msg_gateway/gate/callback.go | 25 +++++++++++++++++++++++++ internal/msg_gateway/gate/ws_server.go | 4 ++++ pkg/call_back_struct/msg_gateway.go | 9 +++++++++ pkg/common/config/config.go | 1 + pkg/common/constant/constant.go | 1 + 6 files changed, 43 insertions(+) diff --git a/config/config.yaml b/config/config.yaml index fc76e3615..400f6e6aa 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -299,6 +299,9 @@ callback: callbackUserOffline: enable: false callbackTimeOut: 2 + callbackUserKickOff: + enable: false + callbackTimeOut: 2 callbackOfflinePush: enable: false callbackTimeOut: 2 diff --git a/internal/msg_gateway/gate/callback.go b/internal/msg_gateway/gate/callback.go index 9df9c6373..e0498f615 100644 --- a/internal/msg_gateway/gate/callback.go +++ b/internal/msg_gateway/gate/callback.go @@ -59,3 +59,28 @@ func callbackUserOffline(operationID, userID string, platformID int) cbApi.Commo } return callbackResp } + +func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} + if !config.Config.Callback.CallbackUserKickOff.Enable { + return callbackResp + } + callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{ + UserStatusCallbackReq: cbApi.UserStatusCallbackReq{ + UserStatusBaseCallback: cbApi.UserStatusBaseCallback{ + CallbackCommand: constant.CallbackUserKickOffCommand, + OperationID: operationID, + PlatformID: int32(platformID), + Platform: constant.PlatformIDToName(platformID), + }, + UserID: userID, + }, + Seq: int(time.Now().UnixNano() / 1e6), + } + callbackUserKickOffResp := &cbApi.CallbackUserKickOffResp{CommonCallbackResp: callbackResp} + if err := http.PostReturn(config.Config.Callback.CallbackUrl, callbackUserKickOffReq, callbackUserKickOffResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + } + return callbackResp +} diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index c92aa6a28..aa165e572 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -233,6 +233,10 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn if err != nil { log.NewError(operationID, "conn close err", err.Error(), uid, platformID) } + callbackResp := callbackUserKickOff(operationID, uid, platformID) + if callbackResp.ErrCode != 0 { + log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) + } } else { log.NewWarn(operationID, "normal uid-conn ", uid, platformID, oldConnMap[platformID]) } diff --git a/pkg/call_back_struct/msg_gateway.go b/pkg/call_back_struct/msg_gateway.go index 2fba45114..5dbf9809b 100644 --- a/pkg/call_back_struct/msg_gateway.go +++ b/pkg/call_back_struct/msg_gateway.go @@ -18,3 +18,12 @@ type CallbackUserOfflineReq struct { type CallbackUserOfflineResp struct { CommonCallbackResp } + +type CallbackUserKickOffReq struct { + UserStatusCallbackReq + Seq int `json:"seq"` +} + +type CallbackUserKickOffResp struct { + CommonCallbackResp +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 29779116e..53082d375 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -264,6 +264,7 @@ type config struct { CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"` CallbackUserOnline callBackConfig `yaml:"callbackUserOnline"` CallbackUserOffline callBackConfig `yaml:"callbackUserOffline"` + CallbackUserKickOff callBackConfig `yaml:"callbackUserKickOff"` CallbackOfflinePush callBackConfig `yaml:"callbackOfflinePush"` CallbackOnlinePush callBackConfig `yaml:"callbackOnlinePush"` CallbackBeforeSuperGroupOnlinePush callBackConfig `yaml:"callbackSuperGroupOnlinePush"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 0eab7c022..b81cb07d6 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -200,6 +200,7 @@ const ( CallbackWordFilterCommand = "callbackWordFilterCommand" CallbackUserOnlineCommand = "callbackUserOnlineCommand" CallbackUserOfflineCommand = "callbackUserOfflineCommand" + CallbackUserKickOffCommand = "callbackUserKickOffCommand" CallbackOfflinePushCommand = "callbackOfflinePushCommand" CallbackOnlinePushCommand = "callbackOnlinePushCommand" CallbackSuperGroupOnlinePushCommand = "callbackSuperGroupOnlinePushCommand"