diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 45377c79c..10032fcb4 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -1,6 +1,7 @@ package msg import ( + "Open_IM/pkg/utils" "context" go_redis "github.com/go-redis/redis/v8" @@ -14,11 +15,17 @@ func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *open_im_sdk.GetMaxAnd resp := new(open_im_sdk.GetMaxAndMinSeqResp) m := make(map[string]*open_im_sdk.MaxAndMinSeq) var maxSeq, minSeq uint64 - var err error - maxSeq, err = commonDB.DB.GetUserMaxSeq(in.UserID) - minSeq, err = commonDB.DB.GetUserMinSeq(in.UserID) - if err != nil && err != go_redis.Nil { - log.NewError(in.OperationID, "getMaxSeq from redis error", in.String(), err.Error()) + var err1, err2 error + maxSeq, err1 = commonDB.DB.GetUserMaxSeq(in.UserID) + minSeq, err2 = commonDB.DB.GetUserMinSeq(in.UserID) + if (err1 != nil && err1 != go_redis.Nil) || (err2 != nil && err2 != go_redis.Nil) { + log.NewError(in.OperationID, "getMaxSeq from redis error", in.String()) + if err1 != nil { + log.NewError(in.OperationID, utils.GetSelfFuncName(), err1.Error()) + } + if err2 != nil { + log.NewError(in.OperationID, utils.GetSelfFuncName(), err2.Error()) + } resp.ErrCode = 200 resp.ErrMsg = "redis get err" return resp, nil diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index fcca588c4..bd6d634bc 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -108,7 +108,7 @@ func (d *DataBases) IncrGroupMaxSeq(groupID string) (uint64, error) { return uint64(seq), err } -func (d *DataBases) SetGroupMaxSeq(groupID string, maxSeq uint32) error { +func (d *DataBases) SetGroupMaxSeq(groupID string, maxSeq uint64) error { key := groupMaxSeq + groupID return d.RDB.Set(context.Background(), key, maxSeq, 0).Err() } diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go index 54444a46f..bef5a312c 100644 --- a/pkg/common/db/batch_insert_chat.go +++ b/pkg/common/db/batch_insert_chat.go @@ -2,6 +2,7 @@ package db import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/msg" "Open_IM/pkg/utils" @@ -104,21 +105,28 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo return nil } -func (d *DataBases) BatchInsertChat2Cache(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) { +func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) { newTime := getCurrentTimestampByMill() - if len(msgList) > GetSingleGocMsgNum() { + lenList := len(msgList) + if lenList > GetSingleGocMsgNum() { return errors.New("too large"), 0 } - currentMaxSeq, err := d.GetUserMaxSeq(userID) - if err == nil { - - } else if err == go_redis.Nil { - currentMaxSeq = 0 + if lenList < 1 { + return errors.New("too short as 0"), 0 + } + // judge sessionType to get seq + var currentMaxSeq uint64 + var err error + if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { + currentMaxSeq, err = d.GetGroupMaxSeq(insertID) } else { + currentMaxSeq, err = d.GetUserMaxSeq(insertID) + } + if err != nil && err != go_redis.Nil { return utils.Wrap(err, ""), 0 } - lastMaxSeq := currentMaxSeq + lastMaxSeq := currentMaxSeq for _, m := range msgList { log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) currentMaxSeq++ @@ -126,13 +134,18 @@ func (d *DataBases) BatchInsertChat2Cache(userID string, msgList []*pbMsg.MsgDat sMsg.SendTime = m.MsgData.SendTime m.MsgData.Seq = uint32(currentMaxSeq) } - log.Debug(operationID, "SetMessageToCache ", userID, len(msgList)) - err = d.SetMessageToCache(msgList, userID, operationID) + log.Debug(operationID, "SetMessageToCache ", insertID, len(msgList)) + err = d.SetMessageToCache(msgList, insertID, operationID) if err != nil { - log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), userID) + log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), insertID) } - log.Debug(operationID, "batch to redis cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList)) - return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), ""), lastMaxSeq + log.Debug(operationID, "batch to redis cost time ", getCurrentTimestampByMill()-newTime, insertID, len(msgList)) + if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { + err = d.SetGroupMaxSeq(insertID, currentMaxSeq) + } else { + err = d.SetUserMaxSeq(insertID, currentMaxSeq) + } + return utils.Wrap(err, ""), lastMaxSeq } //func (d *DataBases) BatchInsertChatBoth(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) {