diff --git a/internal/push/fcm/push.go b/internal/push/fcm/push.go index 2aa7f7f63..1a8617c3b 100644 --- a/internal/push/fcm/push.go +++ b/internal/push/fcm/push.go @@ -19,10 +19,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan type Fcm struct { fcmMsgCli *messaging.Client - cache cache.MsgCache + cache cache.Cache } -func NewClient(cache cache.MsgCache) *Fcm { +func NewClient(cache cache.Cache) *Fcm { opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/fcm/push_test.go b/internal/push/fcm/push_test.go index cc310b30e..0df2b45da 100644 --- a/internal/push/fcm/push_test.go +++ b/internal/push/fcm/push_test.go @@ -9,7 +9,7 @@ import ( ) func Test_Push(t *testing.T) { - var redis cache.MsgCache + var redis cache.Cache offlinePusher := NewClient(redis) err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{}) assert.Nil(t, err) diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 02436a8d3..b263a11a2 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -38,12 +38,12 @@ const ( ) type Client struct { - cache cache.MsgCache + cache cache.Cache tokenExpireTime int64 taskIDTTL int64 } -func NewClient(cache cache.MsgCache) *Client { +func NewClient(cache cache.Cache) *Client { return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index b5b478a12..080377305 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -25,7 +25,7 @@ type RPCServer struct { pusher Pusher } -func (r *RPCServer) Init(rpcPort int, cache cache.MsgCache) { +func (r *RPCServer) Init(rpcPort int, cache cache.Cache) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 097257c55..53b05b572 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -25,7 +25,7 @@ import ( ) type Pusher struct { - cache cache.MsgCache + cache cache.Cache client discoveryregistry.SvcDiscoveryRegistry offlinePusher OfflinePusher groupLocalCache localcache.GroupLocalCache @@ -33,7 +33,7 @@ type Pusher struct { successCount int } -func NewPusher(cache cache.MsgCache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { +func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { return &Pusher{ cache: cache, client: client, diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 6c04fe637..96acffe5f 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -3,12 +3,12 @@ package conversation import ( "OpenIM/internal/common/check" "OpenIM/internal/common/notification" - "OpenIM/internal/tx" "OpenIM/pkg/common/constant" "OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/controller" "OpenIM/pkg/common/db/relation" tableRelation "OpenIM/pkg/common/db/table/relation" + "OpenIM/pkg/common/db/tx" pbConversation "OpenIM/pkg/proto/conversation" "OpenIM/pkg/utils" "context" diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index f345034c2..5f472c27e 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -4,11 +4,11 @@ import ( "OpenIM/internal/common/check" "OpenIM/internal/common/convert" "OpenIM/internal/common/notification" - "OpenIM/internal/tx" "OpenIM/pkg/common/constant" "OpenIM/pkg/common/db/controller" "OpenIM/pkg/common/db/relation" tablerelation "OpenIM/pkg/common/db/table/relation" + "OpenIM/pkg/common/db/tx" "OpenIM/pkg/common/tokenverify" "OpenIM/pkg/common/tracelog" registry "OpenIM/pkg/discoveryregistry" diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 048cc4cd1..064ba33b9 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -3,12 +3,12 @@ package group import ( "OpenIM/internal/common/check" "OpenIM/internal/common/notification" - "OpenIM/internal/tx" "OpenIM/pkg/common/constant" "OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/controller" "OpenIM/pkg/common/db/relation" relationTb "OpenIM/pkg/common/db/table/relation" + "OpenIM/pkg/common/db/tx" "OpenIM/pkg/common/db/unrelation" "OpenIM/pkg/common/tokenverify" "OpenIM/pkg/common/tracelog" diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 79843fa3a..02854cb29 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -9,7 +9,7 @@ import ( func (m *msgServer) DelMsgList(ctx context.Context, req *sdkws.DelMsgListReq) (*sdkws.DelMsgListResp, error) { resp := &sdkws.DelMsgListResp{} - if _, err := m.MsgInterface.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil { + if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil { return nil, err } DeleteMessageNotification(ctx, req.UserID, req.SeqList) @@ -21,14 +21,14 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err } - //maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID) + //maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, req.GroupID) //if err != nil { // return nil, err //} - //if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { + //if err := m.MsgDatabase.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { // return nil, err //} - if err := m.MsgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { + if err := m.MsgDatabase.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { return nil, err } return resp, nil @@ -39,10 +39,10 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { return nil, err } - if err := m.MsgInterface.CleanUpUserMsg(ctx, req.UserID); err != nil { + if err := m.MsgDatabase.CleanUpUserMsg(ctx, req.UserID); err != nil { return nil, err } - //if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil { + //if err := m.MsgDatabase.DelUserAllSeq(ctx, req.UserID); err != nil { // return nil, err //} return resp, nil diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index 95cd057c5..866a39198 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -27,7 +27,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false) return resp, nil } - isExists, err := m.MsgInterface.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType) + isExists, err := m.MsgDatabase.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType) if err != nil { return nil, err } @@ -41,12 +41,12 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S return nil, err } v.LatestUpdateTime = utils.GetCurrentTimestampByMill() - if err := m.MsgInterface.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil { + if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil { return nil, err } } resp.IsReact = true - _, err := m.MsgInterface.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) + _, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S if err != nil { return nil, err } - mongoValue, err := m.MsgInterface.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) + mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) if err != nil { return nil, err } diff --git a/internal/rpc/msg/msg_status.go b/internal/rpc/msg/msg_status.go index 27c5cc136..5d408430e 100644 --- a/internal/rpc/msg/msg_status.go +++ b/internal/rpc/msg/msg_status.go @@ -9,7 +9,7 @@ import ( func (m *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) { resp := &pbMsg.SetSendMsgStatusResp{} - if err := m.MsgInterface.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil { + if err := m.MsgDatabase.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil { return nil, err } return resp, nil @@ -17,7 +17,7 @@ func (m *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgS func (m *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (*pbMsg.GetSendMsgStatusResp, error) { resp := &pbMsg.GetSendMsgStatusResp{} - status, err := m.MsgInterface.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx)) + status, err := m.MsgDatabase.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx)) if IsNotFound(err) { resp.Status = constant.MsgStatusNotExist return resp, nil diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 8f8bc9220..e082cf63f 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -165,7 +165,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe } if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID { - resp, err := m.MsgInterface.GetSuperGroupMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)}) + resp, err := m.MsgDatabase.GetSuperGroupMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)}) if err != nil { return nil, err } @@ -352,7 +352,7 @@ func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []strin if v == "" || groupPB.MsgData.SendID == "" { return constant.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty") } - err := m.MsgInterface.MsgToMQ(ctx, v, &msgToMQGroup) + err := m.MsgDatabase.MsgToMQ(ctx, v, &msgToMQGroup) if err != nil { wg.Done() return err diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 3919b125d..6461189bc 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -25,7 +25,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR return nil, err } msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} - err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle) if err != nil { return nil, err } @@ -42,12 +42,12 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR } func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} - err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle) if err != nil { return nil, err } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle) if err != nil { return nil, err } @@ -74,13 +74,13 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) } msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} if isSend { - err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle) if err != nil { return nil, constant.ErrInternalServer.Wrap("insert to mq") } } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle) if err != nil { return nil, constant.ErrInternalServer.Wrap("insert to mq") } @@ -255,11 +255,11 @@ func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { resp := new(sdkws.GetMaxAndMinSeqResp) m2 := make(map[string]*sdkws.MaxAndMinSeq) - maxSeq, err := m.MsgInterface.GetUserMaxSeq(ctx, req.UserID) + maxSeq, err := m.MsgDatabase.GetUserMaxSeq(ctx, req.UserID) if err != nil { return nil, err } - minSeq, err := m.MsgInterface.GetUserMinSeq(ctx, req.UserID) + minSeq, err := m.MsgDatabase.GetUserMinSeq(ctx, req.UserID) if err != nil { return nil, err } @@ -268,11 +268,11 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin if len(req.GroupIDList) > 0 { resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq) for _, groupID := range req.GroupIDList { - maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, groupID) + maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, groupID) if err != nil { return nil, err } - minSeq, err := m.MsgInterface.GetGroupMinSeq(ctx, groupID) + minSeq, err := m.MsgDatabase.GetGroupMinSeq(ctx, groupID) if err != nil { return nil, err } @@ -287,13 +287,13 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)} - msgs, err := m.MsgInterface.GetMessagesBySeqs(ctx, req.UserID, req.Seqs) + msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, req.Seqs) if err != nil { return nil, err } resp.List = msgs for userID, list := range req.GroupSeqList { - msgs, err := m.MsgInterface.GetMessagesBySeq(ctx, userID, req.Seqs) + msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, userID, req.Seqs) if err != nil { return nil, err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 78a4388b0..ef8c79077 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,7 +16,7 @@ import ( type msgServer struct { RegisterCenter discoveryRegistry.SvcDiscoveryRegistry - MsgInterface controller.MsgDatabase + MsgDatabase controller.MsgDatabase Group *check.GroupChecker User *check.UserCheck Conversation *check.ConversationChecker @@ -45,7 +45,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error { Conversation: check.NewConversationChecker(client), User: check.NewUserCheck(client), Group: check.NewGroupChecker(client), - //MsgInterface: controller.MsgInterface(), + //MsgDatabase: controller.MsgDatabase(), RegisterCenter: client, GroupLocalCache: localcache.NewGroupMemberIDsLocalCache(client), black: check.NewBlackChecker(client), diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go deleted file mode 100644 index e54ff524b..000000000 --- a/pkg/common/db/cache/msg.go +++ /dev/null @@ -1,475 +0,0 @@ -package cache - -import ( - "OpenIM/pkg/common/config" - "OpenIM/pkg/common/constant" - "OpenIM/pkg/common/tracelog" - pbChat "OpenIM/pkg/proto/msg" - pbRtc "OpenIM/pkg/proto/rtc" - "OpenIM/pkg/proto/sdkws" - "OpenIM/pkg/utils" - "context" - "errors" - "fmt" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" - "strconv" - "time" - - "github.com/go-redis/redis/v8" -) - -const ( - userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq - appleDeviceToken = "DEVICE_TOKEN" - userMinSeq = "REDIS_USER_MIN_SEQ:" - - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - messageCache = "MESSAGE_CACHE:" - signalCache = "SIGNAL_CACHE:" - signalListCache = "SIGNAL_LIST_CACHE:" - FcmToken = "FCM_TOKEN:" - groupUserMinSeq = "GROUP_USER_MIN_SEQ:" - groupMaxSeq = "GROUP_MAX_SEQ:" - groupMinSeq = "GROUP_MIN_SEQ:" - sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" - exTypeKeyLocker = "EX_LOCK:" - - uidPidToken = "UID_PID_TOKEN_STATUS:" - - SignalListCache = "SIGNAL_LIST_CACHE:" - - SignalCache = "SIGNAL_CACHE:" -) - -type MsgCache interface { - IncrUserSeq(ctx context.Context, userID string) (int64, error) - GetUserMaxSeq(ctx context.Context, userID string) (int64, error) - SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - GetUserMinSeq(ctx context.Context, userID string) (int64, error) - - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) - IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error - SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error - - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error - - GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) - - SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error - DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error - GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) - DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error - CleanUpOneUserAllMsg(ctx context.Context, userID string) error - HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) - GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) - GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) - DelUserSignalList(ctx context.Context, userID string) error - DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error - - SetGetuiToken(ctx context.Context, token string, expireTime int64) error - GetGetuiToken(ctx context.Context) (string, error) - SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error - GetGetuiTaskID(ctx context.Context) (string, error) - - SetSendMsgStatus(ctx context.Context, id string, status int32) error - GetSendMsgStatus(ctx context.Context, id string) (int32, error) - SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) - GetFcmToken(ctx context.Context, account string, platformID int) (string, error) - DelFcmToken(ctx context.Context, account string, platformID int) error - IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error - GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) - GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) - DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error - SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) - GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) - SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error - LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error -} - -func NewMsgCache(client redis.UniversalClient) MsgCache { - return &msgCache{rdb: client} -} - -type msgCache struct { - rdb redis.UniversalClient -} - -func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) -} - -func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) -} - -func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { - return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) -} - -func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { - return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) -} - -func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64()) -} - -func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { - key := groupUserMinSeq + "g:" + groupID + "u:" + userID - return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) -} - -func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { - return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) -} - -func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) -} - -func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) -} - -func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - key := groupMaxSeq + groupID - seq, err := m.rdb.Incr(ctx, key).Uint64() - return int64(seq), utils.Wrap1(err) -} - -func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { - key := groupMaxSeq + groupID - return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err()) -} - -func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { - key := groupMinSeq + groupID - return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) -} - -func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { - key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err()) -} - -func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { - key := uidPidToken + userID + ":" + platformID - m, err := m.rdb.HGetAll(ctx, key).Result() - if err != nil { - return nil, utils.Wrap1(err) - } - mm := make(map[string]int) - for k, v := range m { - mm[k] = utils.StringToInt(v) - } - return mm, nil -} - -func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { - key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - mm := make(map[string]interface{}) - for k, v := range m { - mm[k] = v - } - return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err()) -} - -func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { - key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err()) -} - -func (m *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) { - var errResult error - for _, v := range seqList { - //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := m.rdb.Get(ctx, key).Result() - if err != nil { - errResult = err - failedSeqList = append(failedSeqList, v) - } else { - msg := sdkws.MsgData{} - err = jsonpb.UnmarshalString(result, &msg) - if err != nil { - errResult = err - failedSeqList = append(failedSeqList, v) - } else { - seqMsg = append(seqMsg, &msg) - } - - } - } - return seqMsg, failedSeqList, errResult -} - -func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) { - pipe := m.rdb.Pipeline() - var failedList []pbChat.MsgDataToMQ - for _, msg := range msgList { - key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) - s, err := utils.Pb2String(msg.MsgData) - if err != nil { - return 0, utils.Wrap1(err) - } - err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - if err != nil { - return 0, utils.Wrap1(err) - } - } - if len(failedList) != 0 { - return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx))) - } - _, err := pipe.Exec(ctx) - return 0, err -} - -func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error { - for _, msg := range msgList { - if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil { - return utils.Wrap1(err) - } - } - return nil -} - -func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { - key := messageCache + userID + "_" + "*" - vals, err := m.rdb.Keys(ctx, key).Result() - if err == redis.Nil { - return nil - } - if err != nil { - return utils.Wrap1(err) - } - for _, v := range vals { - if err := m.rdb.Del(ctx, v).Err(); err != nil { - return utils.Wrap1(err) - } - } - return nil -} - -func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { - req := &pbRtc.SignalReq{} - if err := proto.Unmarshal(msg.Content, req); err != nil { - return false, utils.Wrap1(err) - } - var inviteeUserIDList []string - var isInviteSignal bool - switch signalInfo := req.Payload.(type) { - case *pbRtc.SignalReq_Invite: - inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList - isInviteSignal = true - case *pbRtc.SignalReq_InviteInGroup: - inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList - isInviteSignal = true - if !utils.Contain(pushToUserID, inviteeUserIDList...) { - return false, nil - } - case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: - return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush")) - default: - return false, nil - } - if isInviteSignal { - for _, userID := range inviteeUserIDList { - timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) - if err != nil { - return false, utils.Wrap1(err) - } - keyList := SignalListCache + userID - err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err() - if err != nil { - return false, utils.Wrap1(err) - } - err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err() - if err != nil { - return false, utils.Wrap1(err) - } - key := SignalCache + msg.ClientMsgID - err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() - if err != nil { - return false, utils.Wrap1(err) - } - } - } - return true, nil -} - -func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { - bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes() - if err != nil { - return nil, utils.Wrap1(err) - } - req := &pbRtc.SignalReq{} - if err = proto.Unmarshal(bytes, req); err != nil { - return nil, utils.Wrap1(err) - } - invitationInfo = &pbRtc.SignalInviteReq{} - switch req2 := req.Payload.(type) { - case *pbRtc.SignalReq_Invite: - invitationInfo.Invitation = req2.Invite.Invitation - invitationInfo.OpUserID = req2.Invite.OpUserID - case *pbRtc.SignalReq_InviteInGroup: - invitationInfo.Invitation = req2.InviteInGroup.Invitation - invitationInfo.OpUserID = req2.InviteInGroup.OpUserID - } - return invitationInfo, nil -} - -func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { - key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result() - if err != nil { - return nil, utils.Wrap1(err) - } - invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key) - if err != nil { - return nil, err - } - return invitationInfo, m.DelUserSignalList(ctx, userID) -} - -func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error { - return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err()) -} - -func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error { - for _, seq := range seqList { - key := messageCache + userID + "_" + strconv.Itoa(int(seq)) - result, err := m.rdb.Get(ctx, key).Result() - if err != nil { - if err == redis.Nil { - continue - } - return utils.Wrap1(err) - } - var msg sdkws.MsgData - if err := jsonpb.UnmarshalString(result, &msg); err != nil { - return err - } - msg.Status = constant.MsgDeleted - s, err := utils.Pb2String(&msg) - if err != nil { - return utils.Wrap1(err) - } - if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { - return utils.Wrap1(err) - } - } - return nil -} - -func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { - return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) -} - -func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) { - return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result()) -} - -func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { - return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) -} - -func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { - return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result()) -} - -func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) -} - -func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int() - return int32(result), utils.Wrap1(err) -} - -func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { - return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) -} - -func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { - return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) -} - -func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { - return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) -} - -func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() - return int(seq), utils.Wrap1(err) -} - -func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { - return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) -} - -func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) -} - -func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { - key := exTypeKeyLocker + clientMsgID + "_" + TypeKey - return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err()) -} - -func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { - key := exTypeKeyLocker + clientMsgID + "_" + TypeKey - return utils.Wrap1(m.rdb.Del(ctx, key).Err()) -} - -func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { - switch sessionType { - case constant.SingleChatType: - return "EX_SINGLE_" + clientMsgID - case constant.GroupChatType: - return "EX_GROUP_" + clientMsgID - case constant.SuperGroupChatType: - return "EX_SUPER_GROUP_" + clientMsgID - case constant.NotificationChatType: - return "EX_NOTIFICATION" + clientMsgID - } - return "" -} - -func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() - if err != nil { - return false, utils.Wrap(err, "") - } - return n > 0, nil -} - -func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) -} - -func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) -} - -func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) -} - -func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) -} - -func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) -} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 8869eaf59..900b7c4ec 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -3,6 +3,7 @@ package cache import ( "OpenIM/pkg/common/config" "OpenIM/pkg/common/constant" + "OpenIM/pkg/common/tracelog" pbChat "OpenIM/pkg/proto/msg" pbRtc "OpenIM/pkg/proto/rtc" "OpenIM/pkg/proto/sdkws" @@ -10,12 +11,12 @@ import ( "context" "errors" "fmt" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "strconv" "time" "github.com/go-redis/redis/v8" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" ) const ( @@ -37,6 +38,10 @@ const ( exTypeKeyLocker = "EX_LOCK:" uidPidToken = "UID_PID_TOKEN_STATUS:" + + SignalListCache = "SIGNAL_LIST_CACHE:" + + SignalCache = "SIGNAL_CACHE:" ) type Cache interface { @@ -49,6 +54,7 @@ type Cache interface { SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error @@ -74,8 +80,8 @@ type Cache interface { SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error GetGetuiTaskID(ctx context.Context) (string, error) - SetSendMsgStatus(ctx context.Context, status int32) error - GetSendMsgStatus(ctx context.Context) (int, error) + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) DelFcmToken(ctx context.Context, account string, platformID int) error @@ -92,298 +98,227 @@ type Cache interface { UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error } -// native redis operate - -//func NewRedis() *RedisClient { -// o := &RedisClient{} -// o.InitRedis() -// return o -//} - -func NewRedis() (*RedisClient, error) { - var rdb redis.UniversalClient - if config.Config.Redis.EnableCluster { - rdb = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Config.Redis.DBAddress, - Username: config.Config.Redis.DBUserName, - Password: config.Config.Redis.DBPassWord, // no password set - PoolSize: 50, - }) - //if err := rdb.Ping(ctx).Err();err != nil { - // return nil, fmt.Errorf("redis ping %w", err) - //} - //return &RedisClient{rdb: rdb}, nil - } else { - rdb = redis.NewClient(&redis.Options{ - Addr: config.Config.Redis.DBAddress[0], - Username: config.Config.Redis.DBUserName, - Password: config.Config.Redis.DBPassWord, // no password set - DB: 0, // use default DB - PoolSize: 100, // 连接池大小 - }) - //err := rdb.Ping(ctx).Err() - //if err != nil { - // panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord) - //} - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - err := rdb.Ping(ctx).Err() - if err != nil { - return nil, fmt.Errorf("redis ping %w", err) - } - return &RedisClient{rdb: rdb}, nil +func NewMsgCache(client redis.UniversalClient) Cache { + return &msgCache{rdb: client} } -type RedisClient struct { +type msgCache struct { rdb redis.UniversalClient } -func NewRedisClient(rdb redis.UniversalClient) *RedisClient { - return &RedisClient{rdb: rdb} +func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) } -func (r *RedisClient) GetClient() redis.UniversalClient { - return r.rdb +func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) } -// Perform seq auto-increment operation of user messages -func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) { - key := userIncrSeq + uid - seq, err := r.rdb.Incr(context.Background(), key).Result() - return seq, err +func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { + return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) } -// Get the largest Seq -func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (int64, error) { - key := userIncrSeq + uid - seq, err := r.rdb.Get(context.Background(), key).Result() - return int64(utils.StringToInt(seq)), err +func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { + return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) } -// set the largest Seq -func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq int64) error { - key := userIncrSeq + uid - return r.rdb.Set(context.Background(), key, maxSeq, 0).Err() +func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64()) } -// Set the user's minimum seq -func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq int64) (err error) { - key := userMinSeq + uid - return r.rdb.Set(context.Background(), key, minSeq, 0).Err() -} - -// Get the smallest Seq -func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (int64, error) { - key := userMinSeq + uid - seq, err := r.rdb.Get(context.Background(), key).Result() - return int64(utils.StringToInt(seq)), err -} - -func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { key := groupUserMinSeq + "g:" + groupID + "u:" + userID - return r.rdb.Set(context.Background(), key, minSeq, 0).Err() -} -func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { - key := groupUserMinSeq + "g:" + groupID + "u:" + userID - seq, err := r.rdb.Get(context.Background(), key).Result() - return int64(utils.StringToInt(seq)), err + return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) } -func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +} + +func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) +} + +func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +} + +func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { key := groupMaxSeq + groupID - seq, err := r.rdb.Get(context.Background(), key).Result() - return int64(utils.StringToInt(seq)), err + seq, err := m.rdb.Incr(ctx, key).Uint64() + return int64(seq), utils.Wrap1(err) } -func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { key := groupMaxSeq + groupID - seq, err := r.rdb.Incr(context.Background(), key).Result() - return seq, err + return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err()) } -func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { - key := groupMaxSeq + groupID - return r.rdb.Set(context.Background(), key, maxSeq, 0).Err() -} - -func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { +func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { key := groupMinSeq + groupID - return r.rdb.Set(context.Background(), key, minSeq, 0).Err() + return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) } -// Store userid and platform class to redis -func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { +func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return r.rdb.HSet(context.Background(), key, token, flag).Err() + return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err()) } -//key:userID+platform-> -func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID int) (map[string]int, error) { +func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { key := uidPidToken + userID + ":" + platformID - m, err := r.rdb.HGetAll(context.Background(), key).Result() - mm := make(map[string]int) - for k, v := range m { - mm[k] = utils.StringToInt(v) - } - return mm, err -} - -func (r *RedisClient) GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error) { - key := uidPidToken + userID + ":" + platform - m, err := r.rdb.HGetAll(context.Background(), key).Result() - if err != nil && err == redis.Nil { - return nil, nil + m, err := m.rdb.HGetAll(ctx, key).Result() + if err != nil { + return nil, utils.Wrap1(err) } mm := make(map[string]int) for k, v := range m { mm[k] = utils.StringToInt(v) } - return mm, utils.Wrap(err, "") + return mm, nil } -func (r *RedisClient) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { - key := uidPidToken + userID + ":" + platform +func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) mm := make(map[string]interface{}) for k, v := range m { mm[k] = v } - return r.rdb.HSet(context.Background(), key, mm).Err() + return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err()) } -func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { - key := uidPidToken + userID + ":" + platform - return r.rdb.HDel(context.Background(), key, fields...).Err() +func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err()) } -func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err2 error) { +func (m *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) { + var errResult error for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := r.rdb.Get(context.Background(), key).Result() + result, err := m.rdb.Get(ctx, key).Result() if err != nil { - if err != redis.Nil { - err2 = err - } - failedSeqs = append(failedSeqs, v) + errResult = err + failedSeqList = append(failedSeqList, v) } else { msg := sdkws.MsgData{} err = jsonpb.UnmarshalString(result, &msg) if err != nil { - err2 = err - failedSeqs = append(failedSeqs, v) + errResult = err + failedSeqList = append(failedSeqList, v) } else { - seqMsgs = append(seqMsgs, &msg) + seqMsg = append(seqMsg, &msg) } + } } - return seqMsgs, failedSeqs, err2 + return seqMsg, failedSeqList, errResult } -func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ, uid string) (int, error) { - pipe := r.rdb.Pipeline() - var failedMsgs []pbChat.MsgDataToMQ - for _, msg := range msgs { - key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) +func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) { + pipe := m.rdb.Pipeline() + var failedList []pbChat.MsgDataToMQ + for _, msg := range msgList { + key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) s, err := utils.Pb2String(msg.MsgData) if err != nil { - continue + return 0, utils.Wrap1(err) } err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - //err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() if err != nil { - failedMsgs = append(failedMsgs, *msg) + return 0, utils.Wrap1(err) } } - if len(failedMsgs) != 0 { - return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList)) + if len(failedList) != 0 { + return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx))) } _, err := pipe.Exec(ctx) return 0, err } -func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ) error { - for _, msg := range msgs { - key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) - err := r.rdb.Del(ctx, key).Err() - if err != nil { + +func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error { + for _, msg := range msgList { + if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil { + return utils.Wrap1(err) } } return nil } -func (r *RedisClient) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { +func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { key := messageCache + userID + "_" + "*" - vals, err := r.rdb.Keys(ctx, key).Result() + vals, err := m.rdb.Keys(ctx, key).Result() if err == redis.Nil { return nil } if err != nil { - return utils.Wrap(err, "") + return utils.Wrap1(err) } for _, v := range vals { - err = r.rdb.Del(ctx, v).Err() + if err := m.rdb.Del(ctx, v).Err(); err != nil { + return utils.Wrap1(err) + } } return nil } -func (r *RedisClient) HandleSignalInfo(ctx context.Context, operationID string, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { +func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { req := &pbRtc.SignalReq{} if err := proto.Unmarshal(msg.Content, req); err != nil { - return false, err + return false, utils.Wrap1(err) } - var inviteeUserIDs []string + var inviteeUserIDList []string var isInviteSignal bool switch signalInfo := req.Payload.(type) { case *pbRtc.SignalReq_Invite: - inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList + inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList isInviteSignal = true case *pbRtc.SignalReq_InviteInGroup: - inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList + inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList isInviteSignal = true - if !utils.IsContain(pushToUserID, inviteeUserIDs) { + if !utils.Contain(pushToUserID, inviteeUserIDList...) { return false, nil } case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: - return false, nil + return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush")) default: return false, nil } if isInviteSignal { - for _, userID := range inviteeUserIDs { + for _, userID := range inviteeUserIDList { timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) if err != nil { - return false, err + return false, utils.Wrap1(err) } - keyList := signalListCache + userID - err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err() + keyList := SignalListCache + userID + err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err() if err != nil { - return false, err + return false, utils.Wrap1(err) } - err = r.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() + err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err() if err != nil { - return false, err + return false, utils.Wrap1(err) } - key := signalCache + msg.ClientMsgID - err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() + key := SignalCache + msg.ClientMsgID + err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() if err != nil { - return false, err + return false, utils.Wrap1(err) } } } return true, nil } -func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { - key := signalCache + clientMsgID - invitationInfo = &pbRtc.SignalInviteReq{} - bytes, err := r.rdb.Get(context.Background(), key).Bytes() +func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes() if err != nil { - return nil, err + return nil, utils.Wrap1(err) } req := &pbRtc.SignalReq{} if err = proto.Unmarshal(bytes, req); err != nil { - return nil, err + return nil, utils.Wrap1(err) } + invitationInfo = &pbRtc.SignalInviteReq{} switch req2 := req.Payload.(type) { case *pbRtc.SignalReq_Invite: invitationInfo.Invitation = req2.Invite.Invitation @@ -392,162 +327,112 @@ func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, c invitationInfo.Invitation = req2.InviteInGroup.Invitation invitationInfo.OpUserID = req2.InviteInGroup.OpUserID } - return invitationInfo, err -} - -func (r *RedisClient) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { - keyList := signalListCache + userID - result := r.rdb.LPop(context.Background(), keyList) - if err = result.Err(); err != nil { - return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed") - } - key, err := result.Result() - if err != nil { - return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed") - } - invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(ctx, key) - if err != nil { - return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID") - } - err = r.DelUserSignalList(ctx, userID) - if err != nil { - return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID") - } return invitationInfo, nil } -func (r *RedisClient) DelUserSignalList(ctx context.Context, userID string) error { - keyList := signalListCache + userID - err := r.rdb.Del(context.Background(), keyList).Err() - return err +func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result() + if err != nil { + return nil, utils.Wrap1(err) + } + invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key) + if err != nil { + return nil, err + } + return invitationInfo, m.DelUserSignalList(ctx, userID) } -func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []int64, operationID string) { +func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error { + return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err()) +} + +func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error { for _, seq := range seqList { - key := messageCache + uid + "_" + strconv.Itoa(int(seq)) - result, err := r.rdb.Get(context.Background(), key).Result() + key := messageCache + userID + "_" + strconv.Itoa(int(seq)) + result, err := m.rdb.Get(ctx, key).Result() if err != nil { if err == redis.Nil { - } else { + continue } - continue + return utils.Wrap1(err) } var msg sdkws.MsgData - if err := utils.String2Pb(result, &msg); err != nil { - continue + if err := jsonpb.UnmarshalString(result, &msg); err != nil { + return err } msg.Status = constant.MsgDeleted s, err := utils.Pb2String(&msg) if err != nil { - continue + return utils.Wrap1(err) } - if err := r.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return utils.Wrap1(err) } } + return nil } -func (r *RedisClient) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { - return r.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err() +func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { + return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) } -func (r *RedisClient) GetGetuiToken(ctx context.Context) (string, error) { - result, err := r.rdb.Get(context.Background(), getuiToken).Result() - return result, err +func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result()) } -func (r *RedisClient) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { - return r.rdb.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err() +func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { + return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) } -func (r *RedisClient) GetGetuiTaskID(ctx context.Context) (string, error) { - result, err := r.rdb.Get(context.Background(), getuiTaskID).Result() - return result, err +func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result()) } -func (r *RedisClient) SetSendMsgStatus(ctx context.Context, status int32, operationID string) error { - return r.rdb.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err() +func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { + return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) } -func (r *RedisClient) GetSendMsgStatus(ctx context.Context, operationID string) (int, error) { - result, err := r.rdb.Get(context.Background(), sendMsgFailedFlag+operationID).Result() - if err != nil { - return 0, err - } - status, err := strconv.Atoi(result) - return status, err +func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { + result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int() + return int32(result), utils.Wrap1(err) } -func (r *RedisClient) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { - key := FcmToken + account + ":" + strconv.Itoa(platformID) - return r.rdb.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err() +func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) } -func (r *RedisClient) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { - key := FcmToken + account + ":" + strconv.Itoa(platformID) - return r.rdb.Get(context.Background(), key).Result() -} -func (r *RedisClient) DelFcmToken(ctx context.Context, account string, platformID int) error { - key := FcmToken + account + ":" + strconv.Itoa(platformID) - return r.rdb.Del(context.Background(), key).Err() -} -func (r *RedisClient) IncrUserBadgeUnreadCountSum(ctx context.Context, uid string) (int, error) { - key := userBadgeUnreadCountSum + uid - seq, err := r.rdb.Incr(context.Background(), key).Result() - return int(seq), err -} -func (r *RedisClient) SetUserBadgeUnreadCountSum(ctx context.Context, uid string, value int) error { - key := userBadgeUnreadCountSum + uid - return r.rdb.Set(context.Background(), key, value, 0).Err() -} -func (r *RedisClient) GetUserBadgeUnreadCountSum(ctx context.Context, uid string) (int, error) { - key := userBadgeUnreadCountSum + uid - seq, err := r.rdb.Get(context.Background(), key).Result() - return utils.StringToInt(seq), err -} -func (r *RedisClient) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - key := r.getMessageReactionExPrefix(clientMsgID, sessionType) - n, err := r.rdb.Exists(context.Background(), key).Result() - return n > 0, err +func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) } -func (r *RedisClient) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - key := r.getMessageReactionExPrefix(clientMsgID, sessionType) - return r.rdb.HGetAll(context.Background(), key).Result() - -} -func (r *RedisClient) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - key := r.getMessageReactionExPrefix(clientMsgID, sessionType) - return r.rdb.HDel(context.Background(), key, subKey).Err() - -} -func (r *RedisClient) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - key := r.getMessageReactionExPrefix(clientMsgID, sessionType) - return r.rdb.Expire(context.Background(), key, expiration).Result() +func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { + return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) } -func (r *RedisClient) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - key := r.getMessageReactionExPrefix(clientMsgID, sessionType) - result, err := r.rdb.HGet(context.Background(), key, typeKey).Result() - return result, err +func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() + return int(seq), utils.Wrap1(err) } -func (r *RedisClient) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - key := r.getMessageReactionExPrefix(clientMsgID, sessionType) - return r.rdb.HSet(context.Background(), key, typeKey, value).Err() - +func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { + return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) } -func (r *RedisClient) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { +func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) +} + +func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey - return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err() + return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err()) } -func (r *RedisClient) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + +func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey - return r.rdb.Del(context.Background(), key).Err() - + return utils.Wrap1(m.rdb.Del(ctx, key).Err()) } -func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { +func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { switch sessionType { case constant.SingleChatType: return "EX_SINGLE_" + clientMsgID @@ -560,3 +445,31 @@ func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType } return "" } + +func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { + n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() + if err != nil { + return false, utils.Wrap(err, "") + } + return n > 0, nil +} + +func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { + return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) +} + +func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { + return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) +} + +func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { + return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) +} + +func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { + return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) +} + +func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { + return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) +} diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 9db064a50..b706ecd09 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -1,11 +1,11 @@ package controller import ( - "OpenIM/internal/tx" "OpenIM/pkg/common/constant" "OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/relation" relationTb "OpenIM/pkg/common/db/table/relation" + "OpenIM/pkg/common/db/tx" "OpenIM/pkg/utils" "context" "encoding/json" diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index 58a838dc3..c783dd3d8 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -1,9 +1,9 @@ package controller import ( - "OpenIM/internal/tx" "OpenIM/pkg/common/constant" "OpenIM/pkg/common/db/table/relation" + "OpenIM/pkg/common/db/tx" "OpenIM/pkg/utils" "context" "errors" diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 89f3fd9f6..a513c54ba 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -1,11 +1,11 @@ package controller import ( - "OpenIM/internal/tx" "OpenIM/pkg/common/constant" "OpenIM/pkg/common/db/cache" relationTb "OpenIM/pkg/common/db/table/relation" unRelationTb "OpenIM/pkg/common/db/table/unrelation" + "OpenIM/pkg/common/db/tx" "OpenIM/pkg/utils" "context" "fmt" diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 63baac03e..33ed7f882 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -50,6 +50,8 @@ type MsgDatabase interface { GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) // 设置群用户最小seq 直接调用cache SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) + GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) + // 设置用户最小seq 直接调用cache SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) @@ -79,7 +81,7 @@ func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase { type msgDatabase struct { mgo unRelationTb.MsgDocModelInterface - cache cache.MsgCache + cache cache.Cache msg unRelationTb.MsgDocModel ExtendMsg unRelationTb.ExtendMsgSetModelInterface rdb redis.Client @@ -685,3 +687,7 @@ func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID s func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { return db.cache.SetUserMinSeq(ctx, userID, minSeq) } + +func (db *msgDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { + return db.cache.GetGroupUserMinSeq(ctx, groupID, userID) +} diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go index 09b29c9bf..16b931c1f 100644 --- a/pkg/common/db/controller/push.go +++ b/pkg/common/db/controller/push.go @@ -10,7 +10,7 @@ type PushInterface interface { } type PushDataBase struct { - cache cache.MsgCache + cache cache.Cache } func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error { diff --git a/internal/tx/gorm.go b/pkg/common/db/tx/gorm.go similarity index 100% rename from internal/tx/gorm.go rename to pkg/common/db/tx/gorm.go diff --git a/internal/tx/mongo.go b/pkg/common/db/tx/mongo.go similarity index 100% rename from internal/tx/mongo.go rename to pkg/common/db/tx/mongo.go diff --git a/internal/tx/tx.go b/pkg/common/db/tx/tx.go similarity index 100% rename from internal/tx/tx.go rename to pkg/common/db/tx/tx.go