This commit is contained in:
wangchuxiao 2022-12-26 10:25:42 +08:00
parent 481f12af5e
commit 4fd31e4367
4 changed files with 115 additions and 144 deletions

View File

@ -100,63 +100,46 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
if err != nil { if err != nil {
return 0, err return 0, err
} }
return delStruct.getSetMinSeq(), nil return delStruct.getSetMinSeq() + 1, nil
} }
log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
if len(msgs.Msg) > db.GetSingleGocMsgNum() { if len(msgs.Msg) > db.GetSingleGocMsgNum() {
log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID)
} }
// lastMsgSendTime := msgs.Msg[len(msgs.Msg)-1].SendTime if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) {
var hasMsgDoNotNeedDel bool
for i, msg := range msgs.Msg {
// 找到列表中不需要删除的消息了, 表示为递归到最后一个块
if utils.GetCurrentTimestampByMill() < msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) {
log.NewDebug(operationID, ID, "find uid", msgs.UID)
// 删除块失败 递归结束 返回0
hasMsgDoNotNeedDel = true
if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil {
return 0, err
}
// unMarshall失败 块删除成功 设置为最小seq
msgPb := &server_api_params.MsgData{}
if err = proto.Unmarshal(msg.Msg, msgPb); err != nil {
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
}
// 如果不是块中第一个,就把前面比他早插入的全部设置空 seq字段除外。
if i > 0 {
delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i)
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
}
}
// 递归结束
return msgPb.Seq, nil
}
}
// 该列表中消息全部为老消息并且列表满了, 加入删除列表继续递归
// lastMsgPb := &server_api_params.MsgData{}
// err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
// if err != nil {
// log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
// return 0, utils.Wrap(err, "proto.Unmarshal failed")
// }
// delStruct.minSeq = lastMsgPb.Seq
if msgListIsFull(msgs) {
log.NewDebug(operationID, "msg list is full", msgs.UID)
delStruct.delUidList = append(delStruct.delUidList, msgs.UID) delStruct.delUidList = append(delStruct.delUidList, msgs.UID)
lastMsgPb := &server_api_params.MsgData{}
err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
return 0, utils.Wrap(err, "proto.Unmarshal failed")
}
delStruct.minSeq = lastMsgPb.Seq
} else { } else {
// 列表没有满且没有不需要被删除的消息 代表他是最新的消息块 var hasMarkDelFlag bool
if !hasMsgDoNotNeedDel { for _, msg := range msgs.Msg {
delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, len(msgs.Msg)-1) msgPb := &server_api_params.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil { if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, "Index:", len(msgs.Msg)-1) log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
err = delMongoMsgsPhysical(delStruct.delUidList) return 0, utils.Wrap(err, "proto.Unmarshal failed")
if err != nil { }
return delStruct.getSetMinSeq(), err if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) {
msgPb.Status = constant.MsgDeleted
bytes, _ := proto.Marshal(msgPb)
msg.Msg = bytes
msg.SendTime = 0
hasMarkDelFlag = true
} else {
if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil {
return 0, err
} }
return delStruct.getSetMinSeq(), nil if hasMarkDelFlag {
if err := db.DB.UpdateOneMsgList(msgs); err != nil {
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
}
}
return msgPb.Seq + 1, nil
} }
} }
} }
@ -179,14 +162,6 @@ func msgListIsFull(chat *db.UserChat) bool {
return false return false
} }
func CheckGroupUserMinSeq(operationID, groupID, userID string) error {
return nil
}
func CheckUserMinSeqWithMongo(operationID, userID string) error {
return nil
}
func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error {
var seqRedis uint64 var seqRedis uint64
var err error var err error

View File

@ -3,100 +3,102 @@ package cronTask
import ( import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db" "Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
server_api_params "Open_IM/pkg/proto/sdk_ws" server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "context"
"os/exec" "fmt"
"strconv"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"testing" "testing"
"time" "time"
) )
func getMsgListFake(num int) []*pbMsg.MsgDataToMQ { var (
var msgList []*pbMsg.MsgDataToMQ redisClient *redis.Client
for i := 1; i < num; i++ { mongoClient *mongo.Collection
msgList = append(msgList, &pbMsg.MsgDataToMQ{ )
Token: "tk",
OperationID: "operationID", func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat {
MsgData: &server_api_params.MsgData{ chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))}
SendID: "sendID1", for i := startSeq; i <= stopSeq; i++ {
RecvID: "recvID1", msg := server_api_params.MsgData{
GroupID: "", SendID: "sendID1",
ClientMsgID: "xxx", RecvID: "recvID1",
ServerMsgID: "xxx", GroupID: "",
SenderPlatformID: 1, ClientMsgID: "xxx",
SenderNickname: "testNickName", ServerMsgID: "xxx",
SenderFaceURL: "testFaceURL", SenderPlatformID: 1,
SessionType: 1, SenderNickname: "testNickName",
MsgFrom: 100, SenderFaceURL: "testFaceURL",
ContentType: 101, SessionType: 1,
Content: []byte("testFaceURL"), MsgFrom: 100,
Seq: uint32(i), ContentType: 101,
SendTime: time.Now().Unix(), Content: []byte("testFaceURL"),
CreateTime: time.Now().Unix(), Seq: uint32(i),
Status: 1, SendTime: time.Now().Unix(),
}, CreateTime: time.Now().Unix(),
}) Status: 1,
}
bytes, _ := proto.Marshal(&msg)
sendTime := 0
chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes})
} }
return msgList return chat
}
func SetUserMaxSeq(userID string, seq int) error {
return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err()
}
func CreateChat(userChat *db.UserChat) error {
_, err := mongoClient.InsertOne(context.Background(), userChat)
return err
} }
func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
operationID := getCronTaskOperationID() operationID := getCronTaskOperationID()
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:16379",
Password: "openIM123", // no password set
DB: 13, // use default DB
})
mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
"root", "openIM123", "127.0.0.1:37017",
"openIM", 100)
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri))
mongoClient = client.Database("openIM").Collection("msg")
testUID1 := "test_del_id1" testUID1 := "test_del_id1"
//testUID2 := "test_del_id2" //testUID2 := "test_del_id2"
//testUID3 := "test_del_id3" //testUID3 := "test_del_id3"
//testUID4 := "test_del_id4" //testUID4 := "test_del_id4"
//testUID5 := "test_del_id5" //testUID5 := "test_del_id5"
//testUID6 := "test_del_id6" //testUID6 := "test_del_id6"
testUserIDList := []string{testUID1} err = SetUserMaxSeq(testUID1, 600)
userChat := GenUserChat(1, 500, 200, 0, testUID1)
err = CreateChat(userChat)
err := db.DB.SetUserMaxSeq(testUID1, 500) if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID1); err != nil {
err = db.DB.BatchInsertChat2DB(testUID1, getMsgListFake(500), testUID1+"-"+operationID, 500) t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if err != nil { if err != nil {
t.Error(err.Error(), testUID1) t.Error("err is not nil", testUID1, err.Error())
}
//db.DB.SetUserMaxSeq(testUID1, 6000)
//db.DB.BatchInsertChat2DB()
//
//db.DB.SetUserMaxSeq(testUID1, 4999)
//db.DB.BatchInsertChat2DB()
//
//db.DB.SetUserMaxSeq(testUID1, 30000)
//db.DB.BatchInsertChat2DB()
//
//db.DB.SetUserMaxSeq(testUID1, 9999)
//db.DB.BatchInsertChat2DB()
cmd := exec.Command("/bin/bash", "unset $CONFIG_NAME")
_, err = cmd.StdoutPipe()
if err != nil {
return
}
//执行命令
if err := cmd.Start(); err != nil {
return
}
for _, userID := range testUserIDList {
operationID = userID + "-" + operationID
if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil {
t.Error("checkMaxSeqWithMongo failed", userID)
}
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", userID)
}
}
testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"}
for _, groupID := range testWorkingGroupIDList {
operationID = groupID + "-" + operationID
log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList)
if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil {
t.Error("checkMaxSeqWithMongo failed", groupID)
}
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", groupID)
}
} }
// testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"}
// for _, groupID := range testWorkingGroupIDList {
// operationID = groupID + "-" + operationID
// log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList)
// if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil {
// t.Error("checkMaxSeqWithMongo failed", groupID)
// }
// if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
// t.Error("checkMaxSeqWithMongo failed", groupID)
// }
// }
} }

View File

@ -8,8 +8,9 @@ import (
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"fmt" "fmt"
"github.com/robfig/cron/v3"
"time" "time"
"github.com/robfig/cron/v3"
) )
const cronTaskOperationID = "cronTaskOperationID-" const cronTaskOperationID = "cronTaskOperationID-"
@ -57,6 +58,7 @@ func ClearAll() {
} else { } else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
} }
// working group msg clear // working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup)
if err == nil { if err == nil {
@ -77,9 +79,6 @@ func StartClearMsg(operationID string, userIDList []string) {
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err) log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
} }
if err := CheckUserMinSeqWithMongo(operationID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
} }
} }
@ -98,10 +97,5 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string)
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
} }
for _, userID := range userIDList {
if err := CheckGroupUserMinSeq(operationID, groupID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
}
} }
} }

View File

@ -264,8 +264,8 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr
func (pushReq *PushReq) setPushChannel(title string, body string) { func (pushReq *PushReq) setPushChannel(title string, body string) {
pushReq.PushChannel = &PushChannel{} pushReq.PushChannel = &PushChannel{}
autoBadge := "+1" // autoBadge := "+1"
pushReq.PushChannel.Ios = &Ios{AutoBadge: &autoBadge} pushReq.PushChannel.Ios = &Ios{}
notify := "notify" notify := "notify"
pushReq.PushChannel.Ios.NotiType = &notify pushReq.PushChannel.Ios.NotiType = &notify
pushReq.PushChannel.Ios.Aps.Sound = "default" pushReq.PushChannel.Ios.Aps.Sound = "default"