From 2af2716c2c86c2453f2963554f69fc15d5489cf6 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 23 Feb 2023 18:17:17 +0800 Subject: [PATCH] msg database --- go.mod | 2 +- internal/msgtransfer/modify_msg_handler.go | 2 +- .../msgtransfer/online_history_msg_handler.go | 2 +- .../online_msg_to_mongo_handler.go | 2 +- internal/push/fcm/push.go | 4 +- internal/push/fcm/push_test.go | 2 +- internal/push/getui/push.go | 4 +- internal/push/init.go | 2 +- internal/push/push_rpc_server.go | 2 +- internal/push/push_to_client.go | 4 +- internal/rpc/msg/delete.go | 4 +- pkg/common/db/cache/msg.go | 475 ++++++++++++++++++ pkg/common/db/cache/redis.go | 82 --- pkg/common/db/controller/msg.go | 214 +++----- pkg/common/db/controller/push.go | 2 +- .../db/table/unrelation/extend_msg_set.go | 5 +- pkg/utils/utils.go | 21 + 17 files changed, 588 insertions(+), 241 deletions(-) create mode 100644 pkg/common/db/cache/msg.go diff --git a/go.mod b/go.mod index 10bc1f60a..19d186640 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module Open_IM -go 1.16 +go 1.18 require ( firebase.google.com/go v3.13.0+incompatible diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 31b013115..a1d73832e 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -24,7 +24,7 @@ type ModifyMsgConsumerHandler struct { modifyMsgConsumerGroup *kfk.MConsumerGroup extendMsgInterface controller.ExtendMsgInterface - cache cache.Cache + cache cache.MsgCache } func (mmc *ModifyMsgConsumerHandler) Init() { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b533afd46..e69406958 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -57,7 +57,7 @@ type OnlineHistoryRedisConsumerHandler struct { producerToMongo *kafka.Producer msgInterface controller.MsgInterface - cache cache.Cache + cache cache.MsgCache } func (och *OnlineHistoryRedisConsumerHandler) Init() { diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 9d1202321..841c7afe1 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -19,7 +19,7 @@ import ( type OnlineHistoryMongoConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup msgInterface controller.MsgInterface - cache cache.Cache + cache cache.MsgCache } func (mc *OnlineHistoryMongoConsumerHandler) Init() { diff --git a/internal/push/fcm/push.go b/internal/push/fcm/push.go index fcab72f29..f9754fb49 100644 --- a/internal/push/fcm/push.go +++ b/internal/push/fcm/push.go @@ -19,10 +19,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan type Fcm struct { fcmMsgCli *messaging.Client - cache cache.Cache + cache cache.MsgCache } -func NewClient(cache cache.Cache) *Fcm { +func NewClient(cache cache.MsgCache) *Fcm { opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/fcm/push_test.go b/internal/push/fcm/push_test.go index 4be39b685..05fdd9c39 100644 --- a/internal/push/fcm/push_test.go +++ b/internal/push/fcm/push_test.go @@ -9,7 +9,7 @@ import ( ) func Test_Push(t *testing.T) { - var redis cache.Cache + var redis cache.MsgCache offlinePusher := NewClient(redis) err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{}) assert.Nil(t, err) diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 06b25252d..bb9fb0bef 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -37,12 +37,12 @@ const ( ) type Client struct { - cache cache.Cache + cache cache.MsgCache tokenExpireTime int64 taskIDTTL int64 } -func NewClient(cache cache.Cache) *Client { +func NewClient(cache cache.MsgCache) *Client { return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} } diff --git a/internal/push/init.go b/internal/push/init.go index 51fcce428..8c0b0922a 100644 --- a/internal/push/init.go +++ b/internal/push/init.go @@ -26,7 +26,7 @@ type Push struct { } func (p *Push) Init(rpcPort int) { - var cacheInterface cache.Cache + var cacheInterface cache.MsgCache p.rpcServer.Init(rpcPort, cacheInterface) p.pushCh.Init() diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 6efcc62dc..692cddc57 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -26,7 +26,7 @@ type RPCServer struct { push controller.PushInterface } -func (r *RPCServer) Init(rpcPort int, cache cache.Cache) { +func (r *RPCServer) Init(rpcPort int, cache cache.MsgCache) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName r.etcdSchema = config.Config.Etcd.EtcdSchema diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 3500ed8cc..239129a87 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -25,7 +25,7 @@ import ( ) type Pusher struct { - cache cache.Cache + cache cache.MsgCache client discoveryregistry.SvcDiscoveryRegistry offlinePusher OfflinePusher groupLocalCache localcache.GroupLocalCache @@ -33,7 +33,7 @@ type Pusher struct { successCount int } -func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { +func NewPusher(cache cache.MsgCache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { return &Pusher{ cache: cache, client: client, diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 5695912a0..8643bd4e3 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -3,11 +3,11 @@ package msg import ( "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/proto/msg" - common "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/proto/sdkws" "context" ) -func (m *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) { +func (m *msgServer) DelMsgList(ctx context.Context, req *sdkws.DelMsgListReq) (*sdkws.DelMsgListResp, error) { resp := &common.DelMsgListResp{} if _, err := m.MsgInterface.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil { return nil, err diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go new file mode 100644 index 000000000..0f7320c26 --- /dev/null +++ b/pkg/common/db/cache/msg.go @@ -0,0 +1,475 @@ +package cache + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/tracelog" + pbChat "Open_IM/pkg/proto/msg" + pbRtc "Open_IM/pkg/proto/rtc" + "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/utils" + "context" + "errors" + "fmt" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "strconv" + "time" + + "github.com/go-redis/redis/v8" +) + +const ( + userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq + appleDeviceToken = "DEVICE_TOKEN" + userMinSeq = "REDIS_USER_MIN_SEQ:" + + getuiToken = "GETUI_TOKEN" + getuiTaskID = "GETUI_TASK_ID" + messageCache = "MESSAGE_CACHE:" + signalCache = "SIGNAL_CACHE:" + signalListCache = "SIGNAL_LIST_CACHE:" + FcmToken = "FCM_TOKEN:" + groupUserMinSeq = "GROUP_USER_MIN_SEQ:" + groupMaxSeq = "GROUP_MAX_SEQ:" + groupMinSeq = "GROUP_MIN_SEQ:" + sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" + userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" + exTypeKeyLocker = "EX_LOCK:" + + uidPidToken = "UID_PID_TOKEN_STATUS:" + + SignalListCache = "SIGNAL_LIST_CACHE:" + + SignalCache = "SIGNAL_CACHE:" +) + +type MsgCache interface { + IncrUserSeq(ctx context.Context, userID string) (int64, error) + GetUserMaxSeq(ctx context.Context, userID string) (int64, error) + SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error + SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) + GetUserMinSeq(ctx context.Context, userID string) (int64, error) + + SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) + GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) + GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) + IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error + SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error + + AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + + GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) + + SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error + DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error + GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) + DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error + CleanUpOneUserAllMsg(ctx context.Context, userID string) error + HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) + GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) + GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) + DelUserSignalList(ctx context.Context, userID string) error + DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error + + SetGetuiToken(ctx context.Context, token string, expireTime int64) error + GetGetuiToken(ctx context.Context) (string, error) + SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error + GetGetuiTaskID(ctx context.Context) (string, error) + + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) + SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) + GetFcmToken(ctx context.Context, account string, platformID int) (string, error) + DelFcmToken(ctx context.Context, account string, platformID int) error + IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error + GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) + GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) + DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error + SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) + GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) + SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error + LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error + UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error +} + +func NewMsgCache(client redis.UniversalClient) MsgCache { + return &msgCache{rdb: client} +} + +type msgCache struct { + rdb redis.UniversalClient +} + +func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) +} + +func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) +} + +func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { + return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) +} + +func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { + return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) +} + +func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64()) +} + +func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { + key := groupUserMinSeq + "g:" + groupID + "u:" + userID + return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) +} + +func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +} + +func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) +} + +func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +} + +func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + key := groupMaxSeq + groupID + seq, err := m.rdb.Incr(ctx, key).Uint64() + return int64(seq), utils.Wrap1(err) +} + +func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { + key := groupMaxSeq + groupID + return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err()) +} + +func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { + key := groupMinSeq + groupID + return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) +} + +func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err()) +} + +func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { + key := uidPidToken + userID + ":" + platformID + m, err := m.rdb.HGetAll(ctx, key).Result() + if err != nil { + return nil, utils.Wrap1(err) + } + mm := make(map[string]int) + for k, v := range m { + mm[k] = utils.StringToInt(v) + } + return mm, nil +} + +func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + mm := make(map[string]interface{}) + for k, v := range m { + mm[k] = v + } + return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err()) +} + +func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err()) +} + +func (m *msgCache) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) { + var errResult error + for _, v := range seqList { + //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + key := messageCache + userID + "_" + strconv.Itoa(int(v)) + result, err := m.rdb.Get(ctx, key).Result() + if err != nil { + errResult = err + failedSeqList = append(failedSeqList, v) + } else { + msg := sdkws.MsgData{} + err = jsonpb.UnmarshalString(result, &msg) + if err != nil { + errResult = err + failedSeqList = append(failedSeqList, v) + } else { + seqMsg = append(seqMsg, &msg) + } + + } + } + return seqMsg, failedSeqList, errResult +} + +func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) { + pipe := m.rdb.Pipeline() + var failedList []pbChat.MsgDataToMQ + for _, msg := range msgList { + key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) + s, err := utils.Pb2String(msg.MsgData) + if err != nil { + return 0, utils.Wrap1(err) + } + err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() + if err != nil { + return 0, utils.Wrap1(err) + } + } + if len(failedList) != 0 { + return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx))) + } + _, err := pipe.Exec(ctx) + return 0, err +} + +func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error { + for _, msg := range msgList { + if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil { + return utils.Wrap1(err) + } + } + return nil +} + +func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { + key := messageCache + userID + "_" + "*" + vals, err := m.rdb.Keys(ctx, key).Result() + if err == redis.Nil { + return nil + } + if err != nil { + return utils.Wrap1(err) + } + for _, v := range vals { + if err := m.rdb.Del(ctx, v).Err(); err != nil { + return utils.Wrap1(err) + } + } + return nil +} + +func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { + req := &pbRtc.SignalReq{} + if err := proto.Unmarshal(msg.Content, req); err != nil { + return false, utils.Wrap1(err) + } + var inviteeUserIDList []string + var isInviteSignal bool + switch signalInfo := req.Payload.(type) { + case *pbRtc.SignalReq_Invite: + inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList + isInviteSignal = true + case *pbRtc.SignalReq_InviteInGroup: + inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList + isInviteSignal = true + if !utils.Contain(pushToUserID, inviteeUserIDList...) { + return false, nil + } + case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: + return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush")) + default: + return false, nil + } + if isInviteSignal { + for _, userID := range inviteeUserIDList { + timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) + if err != nil { + return false, utils.Wrap1(err) + } + keyList := SignalListCache + userID + err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err() + if err != nil { + return false, utils.Wrap1(err) + } + err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err() + if err != nil { + return false, utils.Wrap1(err) + } + key := SignalCache + msg.ClientMsgID + err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() + if err != nil { + return false, utils.Wrap1(err) + } + } + } + return true, nil +} + +func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes() + if err != nil { + return nil, utils.Wrap1(err) + } + req := &pbRtc.SignalReq{} + if err = proto.Unmarshal(bytes, req); err != nil { + return nil, utils.Wrap1(err) + } + invitationInfo = &pbRtc.SignalInviteReq{} + switch req2 := req.Payload.(type) { + case *pbRtc.SignalReq_Invite: + invitationInfo.Invitation = req2.Invite.Invitation + invitationInfo.OpUserID = req2.Invite.OpUserID + case *pbRtc.SignalReq_InviteInGroup: + invitationInfo.Invitation = req2.InviteInGroup.Invitation + invitationInfo.OpUserID = req2.InviteInGroup.OpUserID + } + return invitationInfo, nil +} + +func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result() + if err != nil { + return nil, utils.Wrap1(err) + } + invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key) + if err != nil { + return nil, err + } + return invitationInfo, m.DelUserSignalList(ctx, userID) +} + +func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error { + return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err()) +} + +func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error { + for _, seq := range seqList { + key := messageCache + userID + "_" + strconv.Itoa(int(seq)) + result, err := m.rdb.Get(ctx, key).Result() + if err != nil { + if err == redis.Nil { + continue + } + return utils.Wrap1(err) + } + var msg sdkws.MsgData + if err := jsonpb.UnmarshalString(result, &msg); err != nil { + return err + } + msg.Status = constant.MsgDeleted + s, err := utils.Pb2String(&msg) + if err != nil { + return utils.Wrap1(err) + } + if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return utils.Wrap1(err) + } + } + return nil +} + +func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { + return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) +} + +func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result()) +} + +func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { + return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) +} + +func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result()) +} + +func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { + return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) +} + +func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { + result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int() + return int32(result), utils.Wrap1(err) +} + +func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) +} + +func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) +} + +func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { + return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) +} + +func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() + return int(seq), utils.Wrap1(err) +} + +func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { + return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) +} + +func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) +} + +func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err()) +} + +func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return utils.Wrap1(m.rdb.Del(ctx, key).Err()) +} + +func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { + switch sessionType { + case constant.SingleChatType: + return "EX_SINGLE_" + clientMsgID + case constant.GroupChatType: + return "EX_GROUP_" + clientMsgID + case constant.SuperGroupChatType: + return "EX_SUPER_GROUP_" + clientMsgID + case constant.NotificationChatType: + return "EX_NOTIFICATION" + clientMsgID + } + return "" +} + +func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { + n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() + if err != nil { + return false, utils.Wrap(err, "") + } + return n > 0, nil +} + +func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { + return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) +} + +func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { + return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) +} + +func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { + return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) +} + +func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { + return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) +} + +func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { + return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) +} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 05129f93a..f325b1de0 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -18,88 +18,6 @@ import ( "github.com/golang/protobuf/proto" ) -const ( - userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq - appleDeviceToken = "DEVICE_TOKEN" - userMinSeq = "REDIS_USER_MIN_SEQ:" - - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - messageCache = "MESSAGE_CACHE:" - signalCache = "SIGNAL_CACHE:" - signalListCache = "SIGNAL_LIST_CACHE:" - FcmToken = "FCM_TOKEN:" - groupUserMinSeq = "GROUP_USER_MIN_SEQ:" - groupMaxSeq = "GROUP_MAX_SEQ:" - groupMinSeq = "GROUP_MIN_SEQ:" - sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" - exTypeKeyLocker = "EX_LOCK:" - - uidPidToken = "UID_PID_TOKEN_STATUS:" -) - -type Cache interface { - IncrUserSeq(ctx context.Context, userID string) (int64, error) - GetUserMaxSeq(ctx context.Context, userID string) (int64, error) - SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - GetUserMinSeq(ctx context.Context, userID string) (int64, error) - - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error - SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error - - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error - - GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) - - SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error - DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error - GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) - DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error - CleanUpOneUserAllMsg(ctx context.Context, userID string) error - HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) - GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) - GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) - DelUserSignalList(ctx context.Context, userID string) error - DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error - - SetGetuiToken(ctx context.Context, token string, expireTime int64) error - GetGetuiToken(ctx context.Context) (string, error) - SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error - GetGetuiTaskID(ctx context.Context) (string, error) - - SetSendMsgStatus(ctx context.Context, status int32) error - GetSendMsgStatus(ctx context.Context) (int, error) - SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) - GetFcmToken(ctx context.Context, account string, platformID int) (string, error) - DelFcmToken(ctx context.Context, account string, platformID int) error - IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error - GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) - GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) - DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error - SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) - GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) - SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error - LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error -} - -// native redis operate - -//func NewRedis() *RedisClient { -// o := &RedisClient{} -// o.InitRedis() -// return o -//} - func NewRedis() (*RedisClient, error) { var rdb redis.UniversalClient if config.Config.Redis.EnableCluster { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index df306f3ca..d9d39f283 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/common/prome" "Open_IM/pkg/common/tracelog" + "fmt" "github.com/gogo/protobuf/sortkeys" "sync" "time" @@ -23,97 +24,6 @@ import ( "github.com/golang/protobuf/proto" ) -//type MsgInterface interface { -// // 批量插入消息到db -// BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error -// // 刪除redis中消息缓存 -// DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error -// // incrSeq然后批量插入缓存 -// BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) -// // 删除消息 返回不存在的seqList -// DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) -// // 通过seqList获取db中写扩散消息 -// GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) -// // 通过seqList获取大群在db里面的消息 没找到返回错误 -// GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) -// // 删除用户所有消息/cache/db然后重置seq -// CleanUpUserMsg(ctx context.Context, userID string) error -// // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) -// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error -// // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) -// DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error -// // 获取用户 seq mongo和redis -// GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) -// // 获取群 seq mongo和redis -// GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) -// // 设置群用户最小seq 直接调用cache -// SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) -// // 设置用户最小seq 直接调用cache -// SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) -// -// MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error) -//} -// -//func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface { -// return &MsgController{} -//} -// -//type MsgController struct { -// database MsgDatabase -//} -// -//func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { -// return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq) -//} -// -//func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error { -// return m.database.DeleteMessageFromCache(ctx, sourceID, msgList) -//} -// -//func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { -// return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList) -//} -// -//func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { -// return m.database.DelMsgBySeqs(ctx, userID, seqs) -//} -// -//func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { -// return m.database.GetMsgBySeqs(ctx, userID, seqs) -//} -// -//func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { -// return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) -//} -// -//func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error { -// return m.database.CleanUpUserMsg(ctx, userID) -//} -// -//func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { -// return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime) -//} -// -//func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { -// return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime) -//} -// -//func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { -// return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID) -//} -// -//func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { -// return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) -//} -// -//func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { -// return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) -//} -// -//func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { -// return m.database.SetUserMinSeq(ctx, userID, minSeq) -//} - type MsgDatabaseInterface interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error @@ -154,8 +64,8 @@ type MsgDatabaseInterface interface { GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error - SetSendMsgStatus(ctx context.Context, userID string, status int32) error - GetSendMsgStatus(ctx context.Context, userID string) (int32, error) + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error GetUserMaxSeq(ctx context.Context, userID string) (int64, error) GetUserMinSeq(ctx context.Context, userID string) (int64, error) @@ -163,65 +73,97 @@ type MsgDatabaseInterface interface { GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) } + +func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { + return &MsgDatabase{} +} + type MsgDatabase struct { - mgo unRelationTb.MsgDocModelInterface - cache cache.Cache - msg unRelationTb.MsgDocModel + mgo unRelationTb.MsgDocModelInterface + cache cache.MsgCache + msg unRelationTb.MsgDocModel + ExtendMsg unRelationTb.ExtendMsgSetModelInterface + rdb redis.Client +} + +func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel { + r := make(map[string]unRelationTb.KeyValueModel) + for key, value := range reactionExtensionList { + r[key] = unRelationTb.KeyValueModel{ + TypeKey: value.TypeKey, + Value: value.Value, + LatestUpdateTime: value.LatestUpdateTime, + } + } + return r } func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - //TODO implement me - panic("implement me") + return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType) } func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - //TODO implement me - panic("implement me") + return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value) } func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - //TODO implement me - panic("implement me") -} - -func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { - //TODO implement me - panic("implement me") -} - -func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { - //TODO implement me - panic("implement me") + return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration) } func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - //TODO implement me - panic("implement me") + return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey) } func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - //TODO implement me - panic("implement me") + return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType) } func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - //TODO implement me - panic("implement me") + return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey) +} + +func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { + return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList)) +} + +func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { + extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime) + if err != nil { + return nil, err + } + extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID] + if !ok { + return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID)) + } + reactionExtensionList := make(map[string]*pbMsg.KeyValueResp) + for key, model := range extendMsg.ReactionExtensionList { + reactionExtensionList[key] = &pbMsg.KeyValueResp{ + KeyValue: &sdkws.KeyValue{ + TypeKey: model.TypeKey, + Value: model.Value, + LatestUpdateTime: model.LatestUpdateTime, + }, + } + } + return &pbMsg.ExtendMsg{ + ReactionExtensionList: reactionExtensionList, + ClientMsgID: extendMsg.ClientMsgID, + MsgFirstModifyTime: extendMsg.MsgFirstModifyTime, + AttachedInfo: extendMsg.AttachedInfo, + Ex: extendMsg.Ex, + }, nil } func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { - //TODO implement me - panic("implement me") + return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList)) } -func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { + return db.cache.SetSendMsgStatus(ctx, id, status) } -func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { + return db.cache.GetSendMsgStatus(ctx, id) } func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { @@ -230,32 +172,24 @@ func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDat } func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - //TODO implement me - panic("implement me") + return db.cache.GetUserMaxSeq(ctx, userID) } func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { - //TODO implement me - panic("implement me") + return db.cache.GetUserMinSeq(ctx, userID) } func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - //TODO implement me - panic("implement me") + return db.cache.GetGroupMaxSeq(ctx, groupID) } func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - //TODO implement me - panic("implement me") + return db.cache.GetGroupMinSeq(ctx, groupID) } func (db *MsgDatabase) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) { - //TODO implement me - panic("implement me") -} - -func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { - return &MsgDatabase{} + seqMsg, _, err := db.cache.GetMessageListBySeq(ctx, userID, seqs) + return seqMsg, err } func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go index cca942487..ab2f7fdaa 100644 --- a/pkg/common/db/controller/push.go +++ b/pkg/common/db/controller/push.go @@ -10,7 +10,7 @@ type PushInterface interface { } type PushDataBase struct { - cache cache.Cache + cache cache.MsgCache } func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error { diff --git a/pkg/common/db/table/unrelation/extend_msg_set.go b/pkg/common/db/table/unrelation/extend_msg_set.go index e691776d3..3eb99aff6 100644 --- a/pkg/common/db/table/unrelation/extend_msg_set.go +++ b/pkg/common/db/table/unrelation/extend_msg_set.go @@ -1,7 +1,6 @@ package unrelation import ( - "Open_IM/pkg/proto/sdkws" "context" "strconv" "strings" @@ -41,8 +40,8 @@ type ExtendMsgSetModelInterface interface { GetAllExtendMsgSet(ctx context.Context, sourceID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSetModel, err error) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*ExtendMsgSetModel, error) InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *ExtendMsgModel) error - InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error - DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error + InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]KeyValueModel) error + DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]KeyValueModel) error TakeExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsgModel, err error) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 9cc9b6908..213d93200 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -21,6 +21,27 @@ func CopyStructFields(a interface{}, b interface{}, fields ...string) (err error return copier.Copy(a, b) } +func Wrap1(err error) error { + if err != nil { + return Wrap(err, "") + } + return nil +} + +func Wrap2[T any](a T, err error) (T, error) { + if err != nil { + return a, Wrap(err, "") + } + return a, nil +} + +func Wrap3[T any, V any](a T, b V, err error) (T, V, error) { + if err != nil { + return a, b, Wrap(err, "") + } + return a, b, nil +} + func Wrap(err error, message string) error { return errors.Wrap(err, "==> "+printCallerNameAndLine()+message) }