From 271934ef4b7fdff1a5a733bcf89cd6941ccc6c45 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 29 Nov 2022 18:08:47 +0800 Subject: [PATCH] fix groupMuted send msg --- internal/cron_task/clear_msg.go | 28 +++++++++++++-------- internal/cron_task/cron_task.go | 15 ++++++----- pkg/common/db/mongoModel.go | 44 +++++++++++++++++++++++++++++++-- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index f86e4adac..481b3b104 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -164,6 +164,21 @@ func msgListIsFull(chat *db.UserChat) bool { return false } +func CheckGroupUserMinSeq(operationID, groupID, userID string, diffusionType int) error { + return nil +} + +func CheckUserMinSeqWithMongo(operationID, userID string, diffusionType int) error { + //var seqRedis uint64 + //var err error + //if diffusionType == constant.WriteDiffusion { + // seqRedis, err = db.DB.GetUserMinSeq(ID) + //} else { + // seqRedis, err = db.DB.GetGroupUserMinSeq(ID) + //} + return nil +} + func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { var seqRedis uint64 var err error @@ -185,17 +200,10 @@ func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { if msg == nil { return nil } - var seqMongo uint32 - msgPb := &server_api_params.MsgData{} - err = proto.Unmarshal(msg.Msg, msgPb) - if err != nil { - return utils.Wrap(err, "") - } - seqMongo = msgPb.Seq - if math.Abs(float64(seqMongo-uint32(seqRedis))) > 10 { - log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msgPb.Status, msg.SendTime) + if math.Abs(float64(msg.Seq-uint32(seqRedis))) > 10 { + log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msg.Status, msg.SendTime) } else { - log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "seq and msg OK", "status:", msgPb.Status, msg.SendTime) + log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "seq and msg OK", "status:", msg.Status, msg.SendTime) } return nil } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 1fdef997e..f08cc3db4 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -30,11 +30,8 @@ func StartCronTask(userID, workingGroupID string) { fmt.Println("clear msg finished") return } - clearFunc := func() { - ClearAll() - } c := cron.New() - _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearFunc) + _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll) if err != nil { fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) panic(err) @@ -53,7 +50,6 @@ func getCronTaskOperationID() string { func ClearAll() { operationID := getCronTaskOperationID() log.NewInfo(operationID, "====================== start del cron task ======================") - //var userIDList []string var err error userIDList, err := im_mysql_model.SelectAllUserID() if err == nil { @@ -61,7 +57,6 @@ func ClearAll() { } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } - //return // working group msg clear workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) if err == nil { @@ -82,6 +77,9 @@ func StartClearMsg(operationID string, userIDList []string) { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } + if err := CheckUserMinSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), userID, err) + } } } @@ -100,5 +98,10 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } + for _, userID := range userIDList { + if err := CheckGroupUserMinSeq(operationID, groupID, userID, constant.ReadDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) + } + } } } diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 88805b902..5bd96032e 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -318,7 +318,7 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error { return err } -func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) { +func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) @@ -334,13 +334,53 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) { } if len(userChats) > 0 { if len(userChats[0].Msg) > 0 { - return &userChats[0].Msg[len(userChats[0].Msg)-1], nil + msgPb := &open_im_sdk.MsgData{} + err = proto.Unmarshal(userChats[0].Msg[len(userChats[0].Msg)-1].Msg, msgPb) + if err != nil { + return nil, utils.Wrap(err, "") + } + return msgPb, nil } return nil, errors.New("len(userChats[0].Msg) < 0") } return nil, nil } +func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + regex := fmt.Sprintf("^%s", ID) + findOpts := options.Find().SetLimit(1).SetSort(bson.M{"uid": 1}) + var userChats []UserChat + cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts) + if err != nil { + return nil, err + } + err = cursor.All(ctx, &userChats) + if err != nil { + return nil, utils.Wrap(err, "") + } + var oldestMsg []byte + if len(userChats) > 0 { + for _, v := range userChats[0].Msg { + if v.SendTime != 0 { + oldestMsg = v.Msg + break + } + } + if len(oldestMsg) == 0 { + oldestMsg = userChats[0].Msg[len(userChats[0].Msg)-1].Msg + } + msgPb := &open_im_sdk.MsgData{} + err = proto.Unmarshal(oldestMsg, msgPb) + if err != nil { + return nil, utils.Wrap(err, "") + } + return msgPb, nil + } + return nil, nil +} + func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { var hasSeqList []uint32 singleCount := 0