This commit is contained in:
wangchuxiao 2023-02-16 15:20:59 +08:00
parent 0d0a90a14b
commit 29a1f5b4f7
12 changed files with 632 additions and 753 deletions

View File

@ -3,92 +3,77 @@ package cronTask
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context"
"math"
"strconv"
"strings"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
)
type SeqCheckInterface interface {
ClearAll() error
type ClearMsgTool struct {
msgInterface controller.MsgInterface
userInterface controller.UserInterface
groupInterface controller.GroupInterface
}
type ClearMsgCronTask struct {
msgModel controller.MsgInterface
userModel controller.UserInterface
groupModel controller.GroupInterface
cache cache.Cache
}
func (c *ClearMsgCronTask) getCronTaskOperationID() string {
func (c *ClearMsgTool) getCronTaskOperationID() string {
return cronTaskOperationID + utils.OperationIDGenerator()
}
func (c *ClearMsgCronTask) ClearAll() {
func (c *ClearMsgTool) ClearAll() {
operationID := c.getCronTaskOperationID()
ctx := context.Background()
tracelog.SetOperationID(ctx, operationID)
log.NewInfo(operationID, "========================= start del cron task =========================")
log.NewInfo(operationID, "============================ start del cron task ============================")
var err error
userIDList, err := c.userModel.GetAllUserID(ctx)
userIDList, err := c.userInterface.GetAllUserID(ctx)
if err == nil {
c.StartClearMsg(operationID, userIDList)
c.ClearUsersMsg(ctx, userIDList)
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
// working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup)
workingGroupIDList, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err == nil {
c.StartClearWorkingGroupMsg(operationID, workingGroupIDList)
c.ClearSuperGroupMsg(ctx, workingGroupIDList)
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
log.NewInfo(operationID, "========================= start del cron finished =========================")
log.NewInfo(operationID, "============================ start del cron finished ============================")
}
func (c *ClearMsgCronTask) StartClearMsg(operationID string, userIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList)
func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) {
for _, userID := range userIDList {
if err := DeleteUserMsgsAndSetMinSeq(operationID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID)
if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil {
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID)
}
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
}
}
func (c *ClearMsgCronTask) StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList)
for _, groupID := range workingGroupIDList {
userIDList, err := rocksCache.GetGroupMemberIDListFromCache(groupID)
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID)
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID)
continue
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "workingGroupIDList:", userIDList)
if err := DeleteUserSuperGroupMsgsAndSetMinSeq(operationID, groupID, userIDList); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
if
}
}
func checkMaxSeqWithMongo(operationID, sourceID string, diffusionType int) error {
func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) {
for _, groupID := range workingGroupIDList {
userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID)
if err != nil {
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID)
continue
}
if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
}
}
func (c *ClearMsgTool) checkMaxSeqWithMongo(ctx context.Context, sourceID string, diffusionType int) error {
var seqRedis uint64
var err error
if diffusionType == constant.WriteDiffusion {

View File

@ -19,11 +19,11 @@ func StartCronTask(userID, workingGroupID string) {
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
if userID != "" {
operationID := getCronTaskOperationID()
StartClearMsg(operationID, []string{userID})
ClearUsersMsg(operationID, []string{userID})
}
if workingGroupID != "" {
operationID := getCronTaskOperationID()
StartClearWorkingGroupMsg(operationID, []string{workingGroupID})
ClearSuperGroupMsg(operationID, []string{workingGroupID})
}
if userID != "" || workingGroupID != "" {
fmt.Println("clear msg finished")

View File

@ -38,22 +38,24 @@ const (
)
type Cache interface {
IncrUserSeq(ctx context.Context, userID string) (uint64, error)
GetUserMaxSeq(ctx context.Context, userID string) (uint64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq uint64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq uint64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (uint64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq uint64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (uint64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (uint64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (uint64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq uint64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq uint32) error
IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err error)
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
@ -61,7 +63,7 @@ type Cache interface {
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []uint32) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
@ -138,66 +140,66 @@ func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
}
// Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (uint64, error) {
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
return seq, err
}
// Get the largest Seq
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (uint64, error) {
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
// set the largest Seq
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq uint64) error {
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq int64) error {
key := userIncrSeq + uid
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
// Set the user's minimum seq
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq uint64) (err error) {
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq int64) (err error) {
key := userMinSeq + uid
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
// Get the smallest Seq
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (uint64, error) {
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (int64, error) {
key := userMinSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq uint64) (err error) {
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (uint64, error) {
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) {
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) {
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
return seq, err
}
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq uint64) error {
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq uint32) error {
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
@ -231,7 +233,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err2 error) {
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
@ -398,7 +400,7 @@ func (r *RedisClient) DelUserSignalList(ctx context.Context, userID string) erro
return err
}
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []uint32, operationID string) {
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []int64, operationID string) {
for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := r.rdb.Get(context.Background(), key).Result()

View File

@ -26,6 +26,7 @@ type GroupInterface interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)
@ -91,6 +92,10 @@ func (g *GroupController) DismissGroup(ctx context.Context, groupID string) erro
return g.database.DismissGroup(ctx, groupID)
}
func (g *GroupController) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) {
return g.database.
}
func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return g.database.TakeGroupMember(ctx, groupID, userID)
}
@ -182,6 +187,7 @@ type Group interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
}
type GroupMember interface {
@ -229,6 +235,8 @@ type GroupDataBaseInterface interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)

View File

@ -11,7 +11,6 @@ import (
"github.com/gogo/protobuf/sortkeys"
"sync"
//"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
@ -25,30 +24,31 @@ import (
type MsgInterface interface {
// 批量插入消息到db
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error)
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error)
// 获取群ID或者UserID最新一条在db里面的消息
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 获取群ID或者UserID最老一条在db里面的消息
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 通过seqList获取db中写扩散消息
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在db里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/cache/db然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// SetSendMsgStatus
// GetSendMsgStatus
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
}
func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
@ -59,35 +59,27 @@ type MsgController struct {
database MsgDatabase
}
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
}
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error {
return m.database.DeleteMessageFromCache(ctx, userID, msgList)
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
}
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) {
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
}
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) {
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
return m.database.DelMsgBySeqs(ctx, userID, seqs)
}
func (m *MsgController) GetNewestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
return m.database.GetNewestMsg(ctx, ID)
}
func (m *MsgController) GetOldestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
return m.database.GetOldestMsg(ctx, ID)
}
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetMsgBySeqs(ctx, userID, seqs)
}
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
}
@ -95,66 +87,87 @@ func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error
return m.database.CleanUpUserMsg(ctx, userID)
}
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error {
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
}
func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
}
func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
}
func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
}
func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return m.database.SetUserMinSeq(ctx, userID, minSeq)
}
type MsgDatabaseInterface interface {
// 批量插入消息
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error)
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error)
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 获取群ID或者UserID最新一条在mongo里面的消息
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 获取群ID或者UserID最老一条在mongo里面的消息
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 通过seqList获取mongo中写扩散消息
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在 mongo里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/redis/mongo然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
}
type MsgDatabase struct {
msgModel unRelationTb.MsgDocModelInterface
msgCache cache.Cache
msg unRelationTb.MsgDocModel
mgo unRelationTb.MsgDocModelInterface
cache cache.Cache
msg unRelationTb.MsgDocModel
}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{}
}
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill()
if len(msgList) > db.msg.GetSingleGocMsgNum() {
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
return errors.New("too large")
}
var remain uint64
blk0 := uint64(db.msg.GetSingleGocMsgNum() - 1)
var remain int64
blk0 := db.msg.GetSingleGocMsgNum() - 1
//currentMaxSeq 4998
if currentMaxSeq < uint64(db.msg.GetSingleGocMsgNum()) {
if currentMaxSeq < db.msg.GetSingleGocMsgNum() {
remain = blk0 - currentMaxSeq //1
} else {
excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999
remain = (uint64(db.msg.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(db.msg.GetSingleGocMsgNum()))) % uint64(db.msg.GetSingleGocMsgNum())
remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum()
}
//remain=1
insertCounter := uint64(0)
var insertCounter int64
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
docID := ""
@ -165,18 +178,18 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
currentMaxSeq++
sMsg := unRelationTb.MsgInfoModel{}
sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq)
m.MsgData.Seq = currentMaxSeq
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
return utils.Wrap(err, "")
}
if insertCounter < remain {
msgsToMongo = append(msgsToMongo, sMsg)
insertCounter++
docID = db.msg.GetDocID(sourceID, uint32(currentMaxSeq))
docID = db.msg.GetDocID(sourceID, currentMaxSeq)
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} else {
msgsToMongoNext = append(msgsToMongoNext, sMsg)
docIDNext = db.msg.GetDocID(sourceID, uint32(currentMaxSeq))
docIDNext = db.msg.GetDocID(sourceID, currentMaxSeq)
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
}
}
@ -185,13 +198,13 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
//filter := bson.M{"uid": seqUid}
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
err = db.msgModel.PushMsgsToDoc(ctx, docID, msgsToMongo)
err = db.mgo.PushMsgsToDoc(ctx, docID, msgsToMongo)
if err != nil {
if err == mongo.ErrNoDocuments {
doc := &unRelationTb.MsgDocModel{}
doc.DocID = docID
doc.Msg = msgsToMongo
if err = db.msgModel.Create(ctx, doc); err != nil {
if err = db.mgo.Create(ctx, doc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
@ -211,7 +224,7 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
nextDoc.DocID = docIDNext
nextDoc.Msg = msgsToMongoNext
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
if err = db.msgModel.Create(ctx, nextDoc); err != nil {
if err = db.mgo.Create(ctx, nextDoc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
@ -223,26 +236,26 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
}
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
return db.msgCache.DeleteMessageFromCache(ctx, userID, msgs)
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
}
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) {
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)
if lenList > db.msg.GetSingleGocMsgNum() {
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, errors.New("too large")
}
if lenList < 1 {
return 0, errors.New("too short as 0")
}
// judge sessionType to get seq
var currentMaxSeq uint64
var currentMaxSeq int64
var err error
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
currentMaxSeq, err = db.msgCache.GetGroupMaxSeq(ctx, sourceID)
currentMaxSeq, err = db.cache.GetGroupMaxSeq(ctx, sourceID)
//log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
} else {
currentMaxSeq, err = db.msgCache.GetUserMaxSeq(ctx, sourceID)
currentMaxSeq, err = db.cache.GetUserMaxSeq(ctx, sourceID)
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
}
if err != nil && err != redis.Nil {
@ -253,11 +266,11 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
lastMaxSeq := currentMaxSeq
for _, m := range msgList {
currentMaxSeq++
m.MsgData.Seq = uint32(currentMaxSeq)
m.MsgData.Seq = currentMaxSeq
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq)
}
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
failedNum, err := db.msgCache.SetMessageToCache(ctx, sourceID, msgList)
failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
if err != nil {
prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum)
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
@ -266,9 +279,9 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
}
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
err = db.msgCache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
err = db.cache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
} else {
err = db.msgCache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
}
if err != nil {
prome.PromeInc(prome.SeqSetFailedCounter)
@ -278,14 +291,14 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
return lastMaxSeq, utils.Wrap(err, "")
}
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) {
sortkeys.Uint32s(seqs)
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(docIDSeqsMap))
for k, v := range docIDSeqsMap {
go func(docID string, seqs []uint32) {
go func(docID string, seqs []int64) {
defer wg.Done()
unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
@ -299,26 +312,26 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []u
return totalUnExistSeqs, nil
}
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (unExistSeqs []uint32, err error) {
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
}
for i, v := range seqMsgs {
if err = db.msgModel.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
if err = db.mgo.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
return nil, err
}
}
return unExistSeqs, nil
}
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []uint32, err error) {
doc, err := db.msgModel.FindOneByDocID(ctx, docID)
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
}
singleCount := 0
var hasSeqList []uint32
var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
@ -344,7 +357,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
}
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgModel.GetNewestMsg(ctx, sourceID)
msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return nil, err
}
@ -352,7 +365,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
}
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgModel.GetOldestMsg(ctx, sourceID)
msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return nil, err
}
@ -368,12 +381,12 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
return msgPb, nil
}
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []uint32, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqs []uint32
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqs []int64
singleCount := 0
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
for docID, value := range m {
doc, err := db.msgModel.FindOneByDocID(ctx, docID)
doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil {
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
continue
@ -396,7 +409,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
}
}
if len(hasSeqs) != len(seqs) {
var diff []uint32
var diff []int64
var exceptionMsg []*sdkws.MsgData
diff = utils.Difference(hasSeqs, seqs)
if diffusionType == constant.WriteDiffusion {
@ -409,8 +422,8 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
return seqMsg, nil
}
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, userID, seqs)
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -430,8 +443,8 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []u
return successMsgs, nil
}
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, groupID, seqs)
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -456,7 +469,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error
if err != nil {
return err
}
err = db.msgCache.CleanUpOneUserAllMsg(ctx, userID)
err = db.cache.CleanUpOneUserAllMsg(ctx, userID)
return utils.Wrap(err, "")
}
@ -471,15 +484,15 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
}
//log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
for _, userID := range userIDs {
userMinSeq, err := db.msgCache.GetGroupUserMinSeq(ctx, groupID, userID)
userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
if err != nil && err != redis.Nil {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
continue
}
if userMinSeq > uint64(minSeq) {
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
if userMinSeq > minSeq {
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
} else {
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, uint64(minSeq))
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
@ -497,16 +510,16 @@ func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID st
if minSeq == 0 {
return nil
}
return db.msgCache.SetUserMinSeq(ctx, userID, uint64(minSeq))
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}
// this is struct for recursion
type delMsgRecursionStruct struct {
minSeq uint32
minSeq int64
delDocIDList []string
}
func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
return d.minSeq
}
@ -514,9 +527,9 @@ func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (uint32, error) {
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
msgs, err := db.msgModel.GetMsgsByIndex(ctx, sourceID, index)
msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
if err != nil || msgs.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
@ -526,14 +539,14 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
}
}
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList)
err = db.msgModel.Delete(ctx, delStruct.delDocIDList)
err = db.mgo.Delete(ctx, delStruct.delDocIDList)
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
//log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
if len(msgs.Msg) > db.msg.GetSingleGocMsgNum() {
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.NewWarn(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "msgs too large:", len(msgs.Msg), "docID:", msgs.DocID)
}
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
@ -561,11 +574,11 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
msg.SendTime = 0
hasMarkDelFlag = true
} else {
if err := db.msgModel.Delete(ctx, delStruct.delDocIDList); err != nil {
if err := db.mgo.Delete(ctx, delStruct.delDocIDList); err != nil {
return 0, err
}
if hasMarkDelFlag {
if err := db.msgModel.UpdateOneDoc(ctx, msgs); err != nil {
if err := db.mgo.UpdateOneDoc(ctx, msgs); err != nil {
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
}
}
@ -578,3 +591,62 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
return seq, utils.Wrap(err, "deleteMsg failed")
}
func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
// from cache
minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
return
}
func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
return
}
func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
}
msgPb, err := db.unmarshalMsg(oldestMsgMongo)
if err != nil {
return 0, 0, err
}
minSeqMongo = msgPb.Seq
newestMsgMongo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
}
msgPb, err = db.unmarshalMsg(newestMsgMongo)
if err != nil {
return 0, 0, err
}
maxSeqMongo = msgPb.Seq
return
}
func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}

View File

@ -67,3 +67,10 @@ func (g *GroupGorm) Search(ctx context.Context, keyword string, pageNumber, show
}()
return gormSearch[relation.GroupModel](getDBConn(g.DB, tx), []string{"name"}, keyword, pageNumber, showNumber)
}
func (g *GroupGorm) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) {
if err := g.DB.Model(&relation.GroupModel{}).Where("group_type = ? ", groupType).Pluck("group_id", &groupIDs).Error; err != nil {
return nil, utils.Wrap(err, "")
}
return groupIDs, nil
}

View File

@ -41,7 +41,7 @@ func (MsgDocModel) TableName() string {
return CChat
}
func (MsgDocModel) GetSingleGocMsgNum() int {
func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum
}
@ -59,36 +59,36 @@ func (m *MsgDocModel) IsFull() bool {
return false
}
func (m MsgDocModel) GetDocID(sourceID string, seq uint32) string {
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.indexGen(sourceID, seqSuffix)
}
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq uint32) []string {
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum
var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, uint32(i))
seqUserID := m.indexGen(userID, int64(i))
seqUserIDs = append(seqUserIDs, seqUserID)
}
return seqUserIDs
}
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq uint32) string {
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.superGroupIndexGen(groupID, seqSuffix)
}
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix uint32) string {
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []uint32) map[string][]uint32 {
t := make(map[string][]uint32)
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i])
if value, ok := t[docID]; !ok {
var temp []uint32
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
@ -108,11 +108,11 @@ func (m MsgDocModel) getMsgIndex(seq uint32) int {
return int(index)
}
func (m MsgDocModel) indexGen(sourceID string, seqSuffix uint32) string {
return sourceID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
}
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []uint32) (exceptionMsg []*sdkws.MsgData) {
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
@ -121,7 +121,7 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []uint32) (exceptionMsg []*sdk
return exceptionMsg
}
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) {
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v

View File

@ -239,17 +239,6 @@ func NewWarn(OperationID string, args ...interface{}) {
func ShowLog(ctx context.Context) {
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
OperationID := tracelog.GetOperationID(ctx)
//if ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo).GinCtx != nil {
// ctxLogger.WithFields(logrus.Fields{
// "OperationID": OperationID,
// "PID": ctxLogger.Pid,
// }).Infoln("api: ", t.ApiName)
//} else {
// ctxLogger.WithFields(logrus.Fields{
// "OperationID": OperationID,
// "PID": ctxLogger.Pid,
// }).Infoln("rpc: ", t.ApiName)
//}
for _, v := range *t.Funcs {
if v.Err != nil {

View File

@ -223,6 +223,13 @@ message MsgDataToModifyByMQ{
string triggerID = 3;
}
message DelMsgListReq{
string userID = 2;
repeated uint32 seqList = 3;
}
message DelMsgListResp{
}
service msg {
//seq
@ -232,7 +239,7 @@ service msg {
//
rpc SendMsg(SendMsgReq) returns(SendMsgResp);
//
rpc DelMsgList(sdkws.DelMsgListReq) returns(sdkws.DelMsgListResp);
rpc DelMsgList(DelMsgListReq) returns(DelMsgListResp);
//
rpc DelSuperGroupMsg(DelSuperGroupMsgReq) returns(DelSuperGroupMsgResp);
//

File diff suppressed because it is too large Load Diff

View File

@ -132,12 +132,12 @@ message FriendRequest{
///////////////////////////////////base end/////////////////////////////////////
message PullMessageBySeqListReq{
string userID = 1;
repeated uint32 seqList = 3;
repeated int64 seqs = 3;
map <string, seqList>groupSeqList = 4;
}
message seqList {
repeated uint32 seqList = 1;
repeated int64 seqs = 1;
}
@ -159,12 +159,12 @@ message GetMaxAndMinSeqReq {
string userID = 2;
}
message MaxAndMinSeq{
uint32 maxSeq = 1;
uint32 minSeq = 2;
int64 maxSeq = 1;
int64 minSeq = 2;
}
message GetMaxAndMinSeqResp {
uint32 maxSeq = 1;
uint32 minSeq = 2;
int64 maxSeq = 1;
int64 minSeq = 2;
map<string, MaxAndMinSeq> groupMaxAndMinSeq = 5;
}
@ -187,7 +187,7 @@ message MsgData {
int32 msgFrom = 10;
int32 contentType = 11;
bytes content = 12;
uint32 seq = 14;
int64 seq = 14;
int64 sendTime = 15;
int64 createTime = 16;
int32 status = 17;
@ -426,7 +426,7 @@ message ConversationSetPrivateTips{
message DeleteMessageTips{
string opUserID = 1;
string userID = 2;
repeated uint32 seqList = 3;
repeated int64 seqs = 3;
}
///cms
message RequestPagination {
@ -601,13 +601,7 @@ message SignalGetTokenByRoomIDReply {
}
message DelMsgListReq{
string userID = 2;
repeated uint32 seqList = 3;
}
message DelMsgListResp{
}
message SetAppBackgroundStatusReq {
string userID = 1;
@ -642,9 +636,3 @@ message KeyValue {
int64 latestUpdateTime = 3;
}
message ResponsePagination {
int32 CurrentPage = 5;
int32 ShowNumber = 6;
}

View File

@ -57,9 +57,9 @@ func cleanUpFuncName(funcName string) string {
}
// Get the intersection of two slices
func Intersect(slice1, slice2 []uint32) []uint32 {
m := make(map[uint32]bool)
n := make([]uint32, 0)
func Intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]bool)
n := make([]int64, 0)
for _, v := range slice1 {
m[v] = true
}
@ -73,9 +73,9 @@ func Intersect(slice1, slice2 []uint32) []uint32 {
}
// Get the diff of two slices
func Difference(slice1, slice2 []uint32) []uint32 {
m := make(map[uint32]bool)
n := make([]uint32, 0)
func Difference(slice1, slice2 []int64) []int64 {
m := make(map[int64]bool)
n := make([]int64, 0)
inter := Intersect(slice1, slice2)
for _, v := range inter {
m[v] = true