diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 6e7db12d1..ae491734a 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -100,63 +100,46 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if err != nil { return 0, err } - return delStruct.getSetMinSeq(), nil + return delStruct.getSetMinSeq() + 1, nil } log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) if len(msgs.Msg) > db.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } - // lastMsgSendTime := msgs.Msg[len(msgs.Msg)-1].SendTime - - var hasMsgDoNotNeedDel bool - for i, msg := range msgs.Msg { - // 找到列表中不需要删除的消息了, 表示为递归到最后一个块 - if utils.GetCurrentTimestampByMill() < msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { - log.NewDebug(operationID, ID, "find uid", msgs.UID) - // 删除块失败 递归结束 返回0 - hasMsgDoNotNeedDel = true - if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { - return 0, err - } - // unMarshall失败 块删除成功 设置为最小seq - msgPb := &server_api_params.MsgData{} - if err = proto.Unmarshal(msg.Msg, msgPb); err != nil { - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - // 如果不是块中第一个,就把前面比他早插入的全部设置空 seq字段除外。 - if i > 0 { - delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - } - // 递归结束 - return msgPb.Seq, nil - } - } - // 该列表中消息全部为老消息并且列表满了, 加入删除列表继续递归 - // lastMsgPb := &server_api_params.MsgData{} - // err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) - // if err != nil { - // log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) - // return 0, utils.Wrap(err, "proto.Unmarshal failed") - // } - // delStruct.minSeq = lastMsgPb.Seq - if msgListIsFull(msgs) { - log.NewDebug(operationID, "msg list is full", msgs.UID) + if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { delStruct.delUidList = append(delStruct.delUidList, msgs.UID) + lastMsgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + delStruct.minSeq = lastMsgPb.Seq } else { - // 列表没有满且没有不需要被删除的消息 代表他是最新的消息块 - if !hasMsgDoNotNeedDel { - delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, len(msgs.Msg)-1) + var hasMarkDelFlag bool + for _, msg := range msgs.Msg { + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msg.Msg, msgPb) if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, "Index:", len(msgs.Msg)-1) - err = delMongoMsgsPhysical(delStruct.delUidList) - if err != nil { - return delStruct.getSetMinSeq(), err + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { + msgPb.Status = constant.MsgDeleted + bytes, _ := proto.Marshal(msgPb) + msg.Msg = bytes + msg.SendTime = 0 + hasMarkDelFlag = true + } else { + if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { + return 0, err } - return delStruct.getSetMinSeq(), nil + if hasMarkDelFlag { + if err := db.DB.UpdateOneMsgList(msgs); err != nil { + return delStruct.getSetMinSeq(), utils.Wrap(err, "") + } + } + return msgPb.Seq + 1, nil } } } @@ -179,14 +162,6 @@ func msgListIsFull(chat *db.UserChat) bool { return false } -func CheckGroupUserMinSeq(operationID, groupID, userID string) error { - return nil -} - -func CheckUserMinSeqWithMongo(operationID, userID string) error { - return nil -} - func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { var seqRedis uint64 var err error diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 0a2ba7071..88eac2df9 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -3,100 +3,102 @@ package cronTask import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" - pbMsg "Open_IM/pkg/proto/msg" server_api_params "Open_IM/pkg/proto/sdk_ws" - "Open_IM/pkg/utils" - "os/exec" + "context" + "fmt" + "strconv" + + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "testing" "time" ) -func getMsgListFake(num int) []*pbMsg.MsgDataToMQ { - var msgList []*pbMsg.MsgDataToMQ - for i := 1; i < num; i++ { - msgList = append(msgList, &pbMsg.MsgDataToMQ{ - Token: "tk", - OperationID: "operationID", - MsgData: &server_api_params.MsgData{ - SendID: "sendID1", - RecvID: "recvID1", - GroupID: "", - ClientMsgID: "xxx", - ServerMsgID: "xxx", - SenderPlatformID: 1, - SenderNickname: "testNickName", - SenderFaceURL: "testFaceURL", - SessionType: 1, - MsgFrom: 100, - ContentType: 101, - Content: []byte("testFaceURL"), - Seq: uint32(i), - SendTime: time.Now().Unix(), - CreateTime: time.Now().Unix(), - Status: 1, - }, - }) +var ( + redisClient *redis.Client + mongoClient *mongo.Collection +) + +func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat { + chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))} + for i := startSeq; i <= stopSeq; i++ { + msg := server_api_params.MsgData{ + SendID: "sendID1", + RecvID: "recvID1", + GroupID: "", + ClientMsgID: "xxx", + ServerMsgID: "xxx", + SenderPlatformID: 1, + SenderNickname: "testNickName", + SenderFaceURL: "testFaceURL", + SessionType: 1, + MsgFrom: 100, + ContentType: 101, + Content: []byte("testFaceURL"), + Seq: uint32(i), + SendTime: time.Now().Unix(), + CreateTime: time.Now().Unix(), + Status: 1, + } + bytes, _ := proto.Marshal(&msg) + sendTime := 0 + chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) } - return msgList + return chat +} + +func SetUserMaxSeq(userID string, seq int) error { + return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err() +} + +func CreateChat(userChat *db.UserChat) error { + _, err := mongoClient.InsertOne(context.Background(), userChat) + return err } func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { - operationID := getCronTaskOperationID() + redisClient = redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:16379", + Password: "openIM123", // no password set + DB: 13, // use default DB + }) + mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", + "root", "openIM123", "127.0.0.1:37017", + "openIM", 100) + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri)) + mongoClient = client.Database("openIM").Collection("msg") testUID1 := "test_del_id1" //testUID2 := "test_del_id2" //testUID3 := "test_del_id3" //testUID4 := "test_del_id4" //testUID5 := "test_del_id5" //testUID6 := "test_del_id6" - testUserIDList := []string{testUID1} + err = SetUserMaxSeq(testUID1, 600) + userChat := GenUserChat(1, 500, 200, 0, testUID1) + err = CreateChat(userChat) - err := db.DB.SetUserMaxSeq(testUID1, 500) - err = db.DB.BatchInsertChat2DB(testUID1, getMsgListFake(500), testUID1+"-"+operationID, 500) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID1); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID1) + } + if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID1) + } if err != nil { - t.Error(err.Error(), testUID1) - } - //db.DB.SetUserMaxSeq(testUID1, 6000) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 4999) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 30000) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 9999) - //db.DB.BatchInsertChat2DB() - cmd := exec.Command("/bin/bash", "unset $CONFIG_NAME") - _, err = cmd.StdoutPipe() - if err != nil { - return - } - - //执行命令 - if err := cmd.Start(); err != nil { - return - } - for _, userID := range testUserIDList { - operationID = userID + "-" + operationID - if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { - t.Error("checkMaxSeqWithMongo failed", userID) - } - if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", userID) - } - } - - testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"} - for _, groupID := range testWorkingGroupIDList { - operationID = groupID + "-" + operationID - log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList) - if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil { - t.Error("checkMaxSeqWithMongo failed", groupID) - } - if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", groupID) - } + t.Error("err is not nil", testUID1, err.Error()) } + // testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"} + // for _, groupID := range testWorkingGroupIDList { + // operationID = groupID + "-" + operationID + // log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList) + // if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil { + // t.Error("checkMaxSeqWithMongo failed", groupID) + // } + // if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { + // t.Error("checkMaxSeqWithMongo failed", groupID) + // } + // } } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 9e4bcbfd4..484def4f5 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -8,8 +8,9 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "fmt" - "github.com/robfig/cron/v3" "time" + + "github.com/robfig/cron/v3" ) const cronTaskOperationID = "cronTaskOperationID-" @@ -57,6 +58,7 @@ func ClearAll() { } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } + // working group msg clear workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) if err == nil { @@ -77,9 +79,6 @@ func StartClearMsg(operationID string, userIDList []string) { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } - if err := CheckUserMinSeqWithMongo(operationID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), userID, err) - } } } @@ -98,10 +97,5 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } - for _, userID := range userIDList { - if err := CheckGroupUserMinSeq(operationID, groupID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) - } - } } } diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 0c337dba8..783f62f54 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -264,8 +264,8 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr func (pushReq *PushReq) setPushChannel(title string, body string) { pushReq.PushChannel = &PushChannel{} - autoBadge := "+1" - pushReq.PushChannel.Ios = &Ios{AutoBadge: &autoBadge} + // autoBadge := "+1" + pushReq.PushChannel.Ios = &Ios{} notify := "notify" pushReq.PushChannel.Ios.NotiType = ¬ify pushReq.PushChannel.Ios.Aps.Sound = "default"