conversationID

This commit is contained in:
wangchuxiao 2023-05-06 10:42:11 +08:00
parent 72aa6d5eab
commit c9334ca827
4 changed files with 63 additions and 38 deletions

View File

@ -20,7 +20,7 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup
if err := tokenverify.CheckAdmin(ctx); err != nil { if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err return nil, err
} }
if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, req.GroupID, 0); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
@ -31,7 +31,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
if err := m.MsgDatabase.CleanUpUserMsg(ctx, req.UserID); err != nil { if err := m.MsgDatabase.CleanUpConversationMsgs(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil

View File

@ -96,7 +96,7 @@ func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string
log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID) log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID)
continue continue
} }
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
} }
if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil { if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil {
@ -114,7 +114,7 @@ func (c *MsgTool) FixGroupSeq(ctx context.Context, groupID string) error {
} }
func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error { func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error {
_, maxSeqMongo, maxSeqCache, err := c.msgDatabase.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) _, maxSeqMongo, _, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, groupID)
if err != nil { if err != nil {
if err == unrelation.ErrMsgNotFound { if err == unrelation.ErrMsgNotFound {
return nil return nil
@ -126,14 +126,14 @@ func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []str
continue continue
} }
} }
if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo); err != nil {
log.ZWarn(ctx, "cache max seq and mongo max seq is diff > 10", err, "groupID", groupID, "maxSeqCache", maxSeqCache, "maxSeqMongo", maxSeqMongo, "constant.WriteDiffusion", constant.WriteDiffusion) log.ZWarn(ctx, "cache max seq and mongo max seq is diff > 10", err, "groupID", groupID, "maxSeqCache", maxSeqCache, "maxSeqMongo", maxSeqMongo, "constant.WriteDiffusion", constant.WriteDiffusion)
} }
return nil return nil
} }
func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) { func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) {
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, userID) minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, userID)
if err != nil { if err != nil {
if err != unrelation.ErrMsgNotFound { if err != unrelation.ErrMsgNotFound {
log.ZError(ctx, "GetUserMinMaxSeqInMongoAndCache failed", err, "userID", userID) log.ZError(ctx, "GetUserMinMaxSeqInMongoAndCache failed", err, "userID", userID)
@ -142,7 +142,7 @@ func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqC
} }
log.ZDebug(ctx, "userID", userID, "minSeqMongo", minSeqMongo, "maxSeqMongo", maxSeqMongo, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) log.ZDebug(ctx, "userID", userID, "minSeqMongo", minSeqMongo, "maxSeqMongo", maxSeqMongo, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
if minSeqCache > maxSeqCache { if minSeqCache > maxSeqCache {
if err := c.msgDatabase.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil { if err := c.msgDatabase.SetMinSeq(ctx, userID, maxSeqCache); err != nil {
log.ZError(ctx, "SetUserMinSeq failed", err, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) log.ZError(ctx, "SetUserMinSeq failed", err, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
} else { } else {
log.ZInfo(ctx, "SetUserMinSeq success", "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) log.ZInfo(ctx, "SetUserMinSeq success", "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
@ -152,13 +152,13 @@ func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqC
} }
func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) { func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) {
minSeqCache, err = c.msgDatabase.GetGroupUserMinSeq(ctx, groupID, userID) minSeqCache, err = c.msgDatabase.GetMinSeq(ctx, groupID)
if err != nil { if err != nil {
log.ZError(ctx, "GetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID) log.ZError(ctx, "GetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID)
return 0, err return 0, err
} }
if minSeqCache > maxSeqCache { if minSeqCache > maxSeqCache {
if err := c.msgDatabase.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { if err := c.msgDatabase.SetConversationUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil {
log.ZError(ctx, "SetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) log.ZError(ctx, "SetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
} else { } else {
log.ZInfo(ctx, "SetGroupUserMinSeq success", "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) log.ZInfo(ctx, "SetGroupUserMinSeq success", "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
@ -192,16 +192,16 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) error {
return err return err
} }
for _, userID := range userIDs { for _, userID := range userIDs {
userCurrentMinSeq, err := c.msgDatabase.GetUserMinSeq(ctx, userID) userCurrentMinSeq, err := c.msgDatabase.GetMinSeq(ctx, userID)
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
continue continue
} }
userCurrentMaxSeq, err := c.msgDatabase.GetUserMaxSeq(ctx, userID) userCurrentMaxSeq, err := c.msgDatabase.GetMaxSeq(ctx, userID)
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
continue continue
} }
if userCurrentMinSeq > userCurrentMaxSeq { if userCurrentMinSeq > userCurrentMaxSeq {
if err = c.msgDatabase.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil { if err = c.msgDatabase.SetMinSeq(ctx, userID, userCurrentMaxSeq); err != nil {
fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq) fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq)
} }
fmt.Println("fix", userID, userCurrentMaxSeq) fmt.Println("fix", userID, userCurrentMaxSeq)
@ -213,7 +213,7 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) error {
return err return err
} }
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
maxSeq, err := c.msgDatabase.GetGroupMaxSeq(ctx, groupID) maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, groupID)
if err != nil { if err != nil {
fmt.Println("GetGroupMaxSeq failed", groupID) fmt.Println("GetGroupMaxSeq failed", groupID)
continue continue
@ -224,13 +224,13 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) error {
continue continue
} }
for _, userID := range userIDs { for _, userID := range userIDs {
userMinSeq, err := c.msgDatabase.GetGroupUserMinSeq(ctx, groupID, userID) userMinSeq, err := c.msgDatabase.GetMinSeq(ctx, groupID)
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
fmt.Println("GetGroupUserMinSeq failed", groupID, userID) fmt.Println("GetGroupUserMinSeq failed", groupID, userID)
continue continue
} }
if userMinSeq > maxSeq { if userMinSeq > maxSeq {
if err = c.msgDatabase.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil { if err = c.msgDatabase.SetMinSeq(ctx, groupID, maxSeq); err != nil {
fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq) fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq)
} }
fmt.Println("fix", groupID, userID, maxSeq, userMinSeq) fmt.Println("fix", groupID, userID, maxSeq, userMinSeq)

View File

@ -75,8 +75,10 @@ type MsgDatabase interface {
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error) GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// to mq // to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
@ -312,7 +314,6 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID
if lenList < 1 { if lenList < 1 {
return 0, errors.New("too short as 0") return 0, errors.New("too short as 0")
} }
// judge sessionType to get seq
lastMaxSeq := currentMaxSeq lastMaxSeq := currentMaxSeq
for _, m := range msgs { for _, m := range msgs {
currentMaxSeq++ currentMaxSeq++
@ -581,7 +582,7 @@ func (db *msgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, c
if minSeq == 0 { if minSeq == 0 {
return nil return nil
} }
return db.cache.SetUserMinSeq(ctx, map[string]int64{conversationID: minSeq}) return db.cache.SetMinSeq(ctx, conversationID, minSeq)
} }
// this is struct for recursion // this is struct for recursion
@ -604,9 +605,9 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st
if err != nil || msgs.DocID == "" { if err != nil || msgs.DocID == "" {
if err != nil { if err != nil {
if err == unrelation.ErrMsgListNotExist { if err == unrelation.ErrMsgListNotExist {
log.NewDebug(mcontext.GetOperationID(ctx), utils.GetSelfFuncName(), "ID:", conversationID, "index:", index, err.Error()) log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else { } else {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID) log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
} }
} }
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 // 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
@ -616,7 +617,7 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st
} }
return delStruct.getSetMinSeq() + 1, nil return delStruct.getSetMinSeq() + 1, nil
} }
//log.NewDebug(operationID, "ID:", conversationID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) log.ZDebug(ctx, "conversationID", conversationID, "index:", index, "docID", msgs.DocID, "len", len(msgs.Msg))
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
} }
@ -625,17 +626,17 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st
lastMsgPb := &sdkws.MsgData{} lastMsgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
if err != nil { if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID)
return 0, utils.Wrap(err, "proto.Unmarshal failed") return 0, utils.Wrap(err, "proto.Unmarshal failed")
} }
delStruct.minSeq = lastMsgPb.Seq delStruct.minSeq = lastMsgPb.Seq
} else { } else {
var hasMarkDelFlag bool var hasMarkDelFlag bool
for _, msg := range msgs.Msg { for i, msg := range msgs.Msg {
msgPb := &sdkws.MsgData{} msgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb) err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil { if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
return 0, utils.Wrap(err, "proto.Unmarshal failed") return 0, utils.Wrap(err, "proto.Unmarshal failed")
} }
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
@ -666,16 +667,16 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st
func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID) minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID)
if err != nil { if err != nil {
return 0, 0, 0, 0, err return
} }
// from cache // from cache
minSeqCache, err = db.cache.GetUserMinSeq(ctx, conversationID) minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID)
if err != nil { if err != nil {
return 0, 0, 0, 0, err return
} }
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, conversationID) maxSeqCache, err = db.cache.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
return 0, 0, 0, 0, err return
} }
return return
} }
@ -683,20 +684,20 @@ func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Conte
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil { if err != nil {
return 0, 0, err return
} }
msgPb, err := db.unmarshalMsg(oldestMsgMongo) msgPb, err := db.unmarshalMsg(oldestMsgMongo)
if err != nil { if err != nil {
return 0, 0, err return
} }
minSeqMongo = msgPb.Seq minSeqMongo = msgPb.Seq
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil { if err != nil {
return 0, 0, err return
} }
msgPb, err = db.unmarshalMsg(newestMsgMongo) msgPb, err = db.unmarshalMsg(newestMsgMongo)
if err != nil { if err != nil {
return 0, 0, err return
} }
maxSeqMongo = msgPb.Seq maxSeqMongo = msgPb.Seq
return return
@ -720,9 +721,15 @@ func (db *msgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string)
func (db *msgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { func (db *msgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMinSeq(ctx, conversationID) return db.cache.GetMinSeq(ctx, conversationID)
} }
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) { func (db *msgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return db.cache.GetUserMinSeq(ctx, conversationIDs) return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
} }
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) { func (db *msgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) {
return db.cache.SetUserMinSeq(ctx, seqs) return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs)
}
func (db *msgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq)
}
func (db *msgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs)
} }

View File

@ -112,6 +112,24 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string {
return "" return ""
} }
func GetNotificationConversationIDBySessionType(sessionType int, ids ...string) string {
sort.Strings(ids)
if len(ids) > 2 || len(ids) < 1 {
return ""
}
switch sessionType {
case constant.SingleChatType:
return "n_" + strings.Join(ids, "_") // single chat
case constant.GroupChatType:
return "n_" + ids[0] // group chat
case constant.SuperGroupChatType:
return "n_" + ids[0] // super group chat
case constant.NotificationChatType:
return "n_" + ids[0] // server notification chat
}
return ""
}
func IsNotification(conversationID string) bool { func IsNotification(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_") return strings.HasPrefix(conversationID, "n_")
} }