config path

This commit is contained in:
wangchuxiao 2023-02-24 11:01:33 +08:00
parent dd7a7d9cda
commit bb94a7947c
24 changed files with 245 additions and 801 deletions

View File

@ -19,10 +19,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan
type Fcm struct { type Fcm struct {
fcmMsgCli *messaging.Client fcmMsgCli *messaging.Client
cache cache.MsgCache cache cache.Cache
} }
func NewClient(cache cache.MsgCache) *Fcm { func NewClient(cache cache.Cache) *Fcm {
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
fcmApp, err := firebase.NewApp(context.Background(), nil, opt) fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil { if err != nil {

View File

@ -9,7 +9,7 @@ import (
) )
func Test_Push(t *testing.T) { func Test_Push(t *testing.T) {
var redis cache.MsgCache var redis cache.Cache
offlinePusher := NewClient(redis) offlinePusher := NewClient(redis)
err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{}) err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{})
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -38,12 +38,12 @@ const (
) )
type Client struct { type Client struct {
cache cache.MsgCache cache cache.Cache
tokenExpireTime int64 tokenExpireTime int64
taskIDTTL int64 taskIDTTL int64
} }
func NewClient(cache cache.MsgCache) *Client { func NewClient(cache cache.Cache) *Client {
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
} }

View File

@ -25,7 +25,7 @@ type RPCServer struct {
pusher Pusher pusher Pusher
} }
func (r *RPCServer) Init(rpcPort int, cache cache.MsgCache) { func (r *RPCServer) Init(rpcPort int, cache cache.Cache) {
r.rpcPort = rpcPort r.rpcPort = rpcPort
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName
} }

View File

@ -25,7 +25,7 @@ import (
) )
type Pusher struct { type Pusher struct {
cache cache.MsgCache cache cache.Cache
client discoveryregistry.SvcDiscoveryRegistry client discoveryregistry.SvcDiscoveryRegistry
offlinePusher OfflinePusher offlinePusher OfflinePusher
groupLocalCache localcache.GroupLocalCache groupLocalCache localcache.GroupLocalCache
@ -33,7 +33,7 @@ type Pusher struct {
successCount int successCount int
} }
func NewPusher(cache cache.MsgCache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher {
return &Pusher{ return &Pusher{
cache: cache, cache: cache,
client: client, client: client,

View File

@ -3,12 +3,12 @@ package conversation
import ( import (
"OpenIM/internal/common/check" "OpenIM/internal/common/check"
"OpenIM/internal/common/notification" "OpenIM/internal/common/notification"
"OpenIM/internal/tx"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/cache"
"OpenIM/pkg/common/db/controller" "OpenIM/pkg/common/db/controller"
"OpenIM/pkg/common/db/relation" "OpenIM/pkg/common/db/relation"
tableRelation "OpenIM/pkg/common/db/table/relation" tableRelation "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
pbConversation "OpenIM/pkg/proto/conversation" pbConversation "OpenIM/pkg/proto/conversation"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"

View File

@ -4,11 +4,11 @@ import (
"OpenIM/internal/common/check" "OpenIM/internal/common/check"
"OpenIM/internal/common/convert" "OpenIM/internal/common/convert"
"OpenIM/internal/common/notification" "OpenIM/internal/common/notification"
"OpenIM/internal/tx"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/controller" "OpenIM/pkg/common/db/controller"
"OpenIM/pkg/common/db/relation" "OpenIM/pkg/common/db/relation"
tablerelation "OpenIM/pkg/common/db/table/relation" tablerelation "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/common/tokenverify" "OpenIM/pkg/common/tokenverify"
"OpenIM/pkg/common/tracelog" "OpenIM/pkg/common/tracelog"
registry "OpenIM/pkg/discoveryregistry" registry "OpenIM/pkg/discoveryregistry"

View File

@ -3,12 +3,12 @@ package group
import ( import (
"OpenIM/internal/common/check" "OpenIM/internal/common/check"
"OpenIM/internal/common/notification" "OpenIM/internal/common/notification"
"OpenIM/internal/tx"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/cache"
"OpenIM/pkg/common/db/controller" "OpenIM/pkg/common/db/controller"
"OpenIM/pkg/common/db/relation" "OpenIM/pkg/common/db/relation"
relationTb "OpenIM/pkg/common/db/table/relation" relationTb "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/common/db/unrelation" "OpenIM/pkg/common/db/unrelation"
"OpenIM/pkg/common/tokenverify" "OpenIM/pkg/common/tokenverify"
"OpenIM/pkg/common/tracelog" "OpenIM/pkg/common/tracelog"

View File

@ -9,7 +9,7 @@ import (
func (m *msgServer) DelMsgList(ctx context.Context, req *sdkws.DelMsgListReq) (*sdkws.DelMsgListResp, error) { func (m *msgServer) DelMsgList(ctx context.Context, req *sdkws.DelMsgListReq) (*sdkws.DelMsgListResp, error) {
resp := &sdkws.DelMsgListResp{} resp := &sdkws.DelMsgListResp{}
if _, err := m.MsgInterface.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil { if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil {
return nil, err return nil, err
} }
DeleteMessageNotification(ctx, req.UserID, req.SeqList) DeleteMessageNotification(ctx, req.UserID, req.SeqList)
@ -21,14 +21,14 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup
if err := tokenverify.CheckAdmin(ctx); err != nil { if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err return nil, err
} }
//maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID) //maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, req.GroupID)
//if err != nil { //if err != nil {
// return nil, err // return nil, err
//} //}
//if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { //if err := m.MsgDatabase.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil {
// return nil, err // return nil, err
//} //}
if err := m.MsgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { if err := m.MsgDatabase.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
@ -39,10 +39,10 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
if err := m.MsgInterface.CleanUpUserMsg(ctx, req.UserID); err != nil { if err := m.MsgDatabase.CleanUpUserMsg(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
//if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil { //if err := m.MsgDatabase.DelUserAllSeq(ctx, req.UserID); err != nil {
// return nil, err // return nil, err
//} //}
return resp, nil return resp, nil

View File

@ -27,7 +27,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false) notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false)
return resp, nil return resp, nil
} }
isExists, err := m.MsgInterface.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType) isExists, err := m.MsgDatabase.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -41,12 +41,12 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
return nil, err return nil, err
} }
v.LatestUpdateTime = utils.GetCurrentTimestampByMill() v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
if err := m.MsgInterface.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil { if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil {
return nil, err return nil, err
} }
} }
resp.IsReact = true resp.IsReact = true
_, err := m.MsgInterface.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) _, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -55,7 +55,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
if err != nil { if err != nil {
return nil, err return nil, err
} }
mongoValue, err := m.MsgInterface.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -9,7 +9,7 @@ import (
func (m *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) { func (m *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) {
resp := &pbMsg.SetSendMsgStatusResp{} resp := &pbMsg.SetSendMsgStatusResp{}
if err := m.MsgInterface.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil { if err := m.MsgDatabase.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
@ -17,7 +17,7 @@ func (m *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgS
func (m *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (*pbMsg.GetSendMsgStatusResp, error) { func (m *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (*pbMsg.GetSendMsgStatusResp, error) {
resp := &pbMsg.GetSendMsgStatusResp{} resp := &pbMsg.GetSendMsgStatusResp{}
status, err := m.MsgInterface.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx)) status, err := m.MsgDatabase.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx))
if IsNotFound(err) { if IsNotFound(err) {
resp.Status = constant.MsgStatusNotExist resp.Status = constant.MsgStatusNotExist
return resp, nil return resp, nil

View File

@ -165,7 +165,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
} }
if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID { if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID {
resp, err := m.MsgInterface.GetSuperGroupMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)}) resp, err := m.MsgDatabase.GetSuperGroupMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -352,7 +352,7 @@ func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []strin
if v == "" || groupPB.MsgData.SendID == "" { if v == "" || groupPB.MsgData.SendID == "" {
return constant.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty") return constant.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty")
} }
err := m.MsgInterface.MsgToMQ(ctx, v, &msgToMQGroup) err := m.MsgDatabase.MsgToMQ(ctx, v, &msgToMQGroup)
if err != nil { if err != nil {
wg.Done() wg.Done()
return err return err

View File

@ -25,7 +25,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
return nil, err return nil, err
} }
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle) err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -42,12 +42,12 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
} }
func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle) err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle) err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -74,13 +74,13 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
} }
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
if isSend { if isSend {
err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle) err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle)
if err != nil { if err != nil {
return nil, constant.ErrInternalServer.Wrap("insert to mq") return nil, constant.ErrInternalServer.Wrap("insert to mq")
} }
} }
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle) err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle)
if err != nil { if err != nil {
return nil, constant.ErrInternalServer.Wrap("insert to mq") return nil, constant.ErrInternalServer.Wrap("insert to mq")
} }
@ -255,11 +255,11 @@ func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg
func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) {
resp := new(sdkws.GetMaxAndMinSeqResp) resp := new(sdkws.GetMaxAndMinSeqResp)
m2 := make(map[string]*sdkws.MaxAndMinSeq) m2 := make(map[string]*sdkws.MaxAndMinSeq)
maxSeq, err := m.MsgInterface.GetUserMaxSeq(ctx, req.UserID) maxSeq, err := m.MsgDatabase.GetUserMaxSeq(ctx, req.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
minSeq, err := m.MsgInterface.GetUserMinSeq(ctx, req.UserID) minSeq, err := m.MsgDatabase.GetUserMinSeq(ctx, req.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -268,11 +268,11 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin
if len(req.GroupIDList) > 0 { if len(req.GroupIDList) > 0 {
resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq) resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq)
for _, groupID := range req.GroupIDList { for _, groupID := range req.GroupIDList {
maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, groupID) maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, groupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
minSeq, err := m.MsgInterface.GetGroupMinSeq(ctx, groupID) minSeq, err := m.MsgDatabase.GetGroupMinSeq(ctx, groupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -287,13 +287,13 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin
func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)} resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
msgs, err := m.MsgInterface.GetMessagesBySeqs(ctx, req.UserID, req.Seqs) msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, req.Seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp.List = msgs resp.List = msgs
for userID, list := range req.GroupSeqList { for userID, list := range req.GroupSeqList {
msgs, err := m.MsgInterface.GetMessagesBySeq(ctx, userID, req.Seqs) msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, userID, req.Seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -16,7 +16,7 @@ import (
type msgServer struct { type msgServer struct {
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
MsgInterface controller.MsgDatabase MsgDatabase controller.MsgDatabase
Group *check.GroupChecker Group *check.GroupChecker
User *check.UserCheck User *check.UserCheck
Conversation *check.ConversationChecker Conversation *check.ConversationChecker
@ -45,7 +45,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
Conversation: check.NewConversationChecker(client), Conversation: check.NewConversationChecker(client),
User: check.NewUserCheck(client), User: check.NewUserCheck(client),
Group: check.NewGroupChecker(client), Group: check.NewGroupChecker(client),
//MsgInterface: controller.MsgInterface(), //MsgDatabase: controller.MsgDatabase(),
RegisterCenter: client, RegisterCenter: client,
GroupLocalCache: localcache.NewGroupMemberIDsLocalCache(client), GroupLocalCache: localcache.NewGroupMemberIDsLocalCache(client),
black: check.NewBlackChecker(client), black: check.NewBlackChecker(client),

View File

@ -1,475 +0,0 @@
package cache
import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tracelog"
pbChat "OpenIM/pkg/proto/msg"
pbRtc "OpenIM/pkg/proto/rtc"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
SignalListCache = "SIGNAL_LIST_CACHE:"
SignalCache = "SIGNAL_CACHE:"
)
type MsgCache interface {
IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
}
func NewMsgCache(client redis.UniversalClient) MsgCache {
return &msgCache{rdb: client}
}
type msgCache struct {
rdb redis.UniversalClient
}
func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
}
func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
}
func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err())
}
func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err())
}
func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64())
}
func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := m.rdb.Incr(ctx, key).Uint64()
return int64(seq), utils.Wrap1(err)
}
func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID
return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err())
}
func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err())
}
func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
m, err := m.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, utils.Wrap1(err)
}
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, nil
}
func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err())
}
func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
}
func (m *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
var errResult error
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := m.rdb.Get(ctx, key).Result()
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
seqMsg = append(seqMsg, &msg)
}
}
}
return seqMsg, failedSeqList, errResult
}
func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) {
pipe := m.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
return 0, utils.Wrap1(err)
}
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
if err != nil {
return 0, utils.Wrap1(err)
}
}
if len(failedList) != 0 {
return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx)))
}
_, err := pipe.Exec(ctx)
return 0, err
}
func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error {
for _, msg := range msgList {
if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
key := messageCache + userID + "_" + "*"
vals, err := m.rdb.Keys(ctx, key).Result()
if err == redis.Nil {
return nil
}
if err != nil {
return utils.Wrap1(err)
}
for _, v := range vals {
if err := m.rdb.Del(ctx, v).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, utils.Wrap1(err)
}
var inviteeUserIDList []string
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup:
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
if !utils.Contain(pushToUserID, inviteeUserIDList...) {
return false, nil
}
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush"))
default:
return false, nil
}
if isInviteSignal {
for _, userID := range inviteeUserIDList {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return false, utils.Wrap1(err)
}
keyList := SignalListCache + userID
err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
if err != nil {
return false, utils.Wrap1(err)
}
err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, utils.Wrap1(err)
}
key := SignalCache + msg.ClientMsgID
err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, utils.Wrap1(err)
}
}
}
return true, nil
}
func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes()
if err != nil {
return nil, utils.Wrap1(err)
}
req := &pbRtc.SignalReq{}
if err = proto.Unmarshal(bytes, req); err != nil {
return nil, utils.Wrap1(err)
}
invitationInfo = &pbRtc.SignalInviteReq{}
switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation
invitationInfo.OpUserID = req2.Invite.OpUserID
case *pbRtc.SignalReq_InviteInGroup:
invitationInfo.Invitation = req2.InviteInGroup.Invitation
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
}
return invitationInfo, nil
}
func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result()
if err != nil {
return nil, utils.Wrap1(err)
}
invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key)
if err != nil {
return nil, err
}
return invitationInfo, m.DelUserSignalList(ctx, userID)
}
func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error {
return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err())
}
func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error {
for _, seq := range seqList {
key := messageCache + userID + "_" + strconv.Itoa(int(seq))
result, err := m.rdb.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
continue
}
return utils.Wrap1(err)
}
var msg sdkws.MsgData
if err := jsonpb.UnmarshalString(result, &msg); err != nil {
return err
}
msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg)
if err != nil {
return utils.Wrap1(err)
}
if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result())
}
func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result())
}
func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
}
func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
return int32(result), utils.Wrap1(err)
}
func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
}
func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
}
func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), utils.Wrap1(err)
}
func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
}
func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
}
func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err())
}
func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return utils.Wrap1(m.rdb.Del(ctx, key).Err())
}
func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
case constant.GroupChatType:
return "EX_GROUP_" + clientMsgID
case constant.SuperGroupChatType:
return "EX_SUPER_GROUP_" + clientMsgID
case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}
func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
}
return n > 0, nil
}
func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
}
func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
}
func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
}
func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}

View File

@ -3,6 +3,7 @@ package cache
import ( import (
"OpenIM/pkg/common/config" "OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tracelog"
pbChat "OpenIM/pkg/proto/msg" pbChat "OpenIM/pkg/proto/msg"
pbRtc "OpenIM/pkg/proto/rtc" pbRtc "OpenIM/pkg/proto/rtc"
"OpenIM/pkg/proto/sdkws" "OpenIM/pkg/proto/sdkws"
@ -10,12 +11,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"strconv" "strconv"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
) )
const ( const (
@ -37,6 +38,10 @@ const (
exTypeKeyLocker = "EX_LOCK:" exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:" uidPidToken = "UID_PID_TOKEN_STATUS:"
SignalListCache = "SIGNAL_LIST_CACHE:"
SignalCache = "SIGNAL_CACHE:"
) )
type Cache interface { type Cache interface {
@ -49,6 +54,7 @@ type Cache interface {
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
@ -74,8 +80,8 @@ type Cache interface {
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error) GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, status int32) error SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context) (int, error) GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error) GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error DelFcmToken(ctx context.Context, account string, platformID int) error
@ -92,298 +98,227 @@ type Cache interface {
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
} }
// native redis operate func NewMsgCache(client redis.UniversalClient) Cache {
return &msgCache{rdb: client}
//func NewRedis() *RedisClient {
// o := &RedisClient{}
// o.InitRedis()
// return o
//}
func NewRedis() (*RedisClient, error) {
var rdb redis.UniversalClient
if config.Config.Redis.EnableCluster {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
PoolSize: 50,
})
//if err := rdb.Ping(ctx).Err();err != nil {
// return nil, fmt.Errorf("redis ping %w", err)
//}
//return &RedisClient{rdb: rdb}, nil
} else {
rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
//err := rdb.Ping(ctx).Err()
//if err != nil {
// panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
//}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err := rdb.Ping(ctx).Err()
if err != nil {
return nil, fmt.Errorf("redis ping %w", err)
}
return &RedisClient{rdb: rdb}, nil
} }
type RedisClient struct { type msgCache struct {
rdb redis.UniversalClient rdb redis.UniversalClient
} }
func NewRedisClient(rdb redis.UniversalClient) *RedisClient { func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return &RedisClient{rdb: rdb} return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
} }
func (r *RedisClient) GetClient() redis.UniversalClient { func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return r.rdb return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
} }
// Perform seq auto-increment operation of user messages func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) { return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err())
key := userIncrSeq + uid
seq, err := r.rdb.Incr(context.Background(), key).Result()
return seq, err
} }
// Get the largest Seq func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (int64, error) { return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err())
key := userIncrSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return int64(utils.StringToInt(seq)), err
} }
// set the largest Seq func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq int64) error { return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64())
key := userIncrSeq + uid
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
} }
// Set the user's minimum seq func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq int64) (err error) {
key := userMinSeq + uid
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
// Get the smallest Seq
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (int64, error) {
key := userMinSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err() return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := r.rdb.Get(context.Background(), key).Result()
return int64(utils.StringToInt(seq)), err
} }
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
seq, err := r.rdb.Get(context.Background(), key).Result() seq, err := m.rdb.Incr(ctx, key).Uint64()
return int64(utils.StringToInt(seq)), err return int64(seq), utils.Wrap1(err)
} }
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
seq, err := r.rdb.Incr(context.Background(), key).Result() return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err())
return seq, err
} }
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMaxSeq + groupID
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID key := groupMinSeq + groupID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err() return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
} }
// Store userid and platform class to redis func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return r.rdb.HSet(context.Background(), key, token, flag).Err() return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err())
} }
//key:userID+platform-> <token, flag> func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID int) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID key := uidPidToken + userID + ":" + platformID
m, err := r.rdb.HGetAll(context.Background(), key).Result() m, err := m.rdb.HGetAll(ctx, key).Result()
mm := make(map[string]int) if err != nil {
for k, v := range m { return nil, utils.Wrap1(err)
mm[k] = utils.StringToInt(v)
}
return mm, err
}
func (r *RedisClient) GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platform
m, err := r.rdb.HGetAll(context.Background(), key).Result()
if err != nil && err == redis.Nil {
return nil, nil
} }
mm := make(map[string]int) mm := make(map[string]int)
for k, v := range m { for k, v := range m {
mm[k] = utils.StringToInt(v) mm[k] = utils.StringToInt(v)
} }
return mm, utils.Wrap(err, "") return mm, nil
} }
func (r *RedisClient) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
key := uidPidToken + userID + ":" + platform key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
mm := make(map[string]interface{}) mm := make(map[string]interface{})
for k, v := range m { for k, v := range m {
mm[k] = v mm[k] = v
} }
return r.rdb.HSet(context.Background(), key, mm).Err() return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err())
} }
func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + platform key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return r.rdb.HDel(context.Background(), key, fields...).Err() return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
} }
func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err2 error) { func (m *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
var errResult error
for _, v := range seqList { for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v)) key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := r.rdb.Get(context.Background(), key).Result() result, err := m.rdb.Get(ctx, key).Result()
if err != nil { if err != nil {
if err != redis.Nil { errResult = err
err2 = err failedSeqList = append(failedSeqList, v)
}
failedSeqs = append(failedSeqs, v)
} else { } else {
msg := sdkws.MsgData{} msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg) err = jsonpb.UnmarshalString(result, &msg)
if err != nil { if err != nil {
err2 = err errResult = err
failedSeqs = append(failedSeqs, v) failedSeqList = append(failedSeqList, v)
} else { } else {
seqMsgs = append(seqMsgs, &msg) seqMsg = append(seqMsg, &msg)
} }
} }
} }
return seqMsgs, failedSeqs, err2 return seqMsg, failedSeqList, errResult
} }
func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ, uid string) (int, error) { func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) {
pipe := r.rdb.Pipeline() pipe := m.rdb.Pipeline()
var failedMsgs []pbChat.MsgDataToMQ var failedList []pbChat.MsgDataToMQ
for _, msg := range msgs { for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData) s, err := utils.Pb2String(msg.MsgData)
if err != nil { if err != nil {
continue return 0, utils.Wrap1(err)
} }
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
//err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil { if err != nil {
failedMsgs = append(failedMsgs, *msg) return 0, utils.Wrap1(err)
} }
} }
if len(failedMsgs) != 0 { if len(failedList) != 0 {
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList)) return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx)))
} }
_, err := pipe.Exec(ctx) _, err := pipe.Exec(ctx)
return 0, err return 0, err
} }
func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ) error {
for _, msg := range msgs { func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) for _, msg := range msgList {
err := r.rdb.Del(ctx, key).Err() if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil {
if err != nil { return utils.Wrap1(err)
} }
} }
return nil return nil
} }
func (r *RedisClient) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
key := messageCache + userID + "_" + "*" key := messageCache + userID + "_" + "*"
vals, err := r.rdb.Keys(ctx, key).Result() vals, err := m.rdb.Keys(ctx, key).Result()
if err == redis.Nil { if err == redis.Nil {
return nil return nil
} }
if err != nil { if err != nil {
return utils.Wrap(err, "") return utils.Wrap1(err)
} }
for _, v := range vals { for _, v := range vals {
err = r.rdb.Del(ctx, v).Err() if err := m.rdb.Del(ctx, v).Err(); err != nil {
return utils.Wrap1(err)
}
} }
return nil return nil
} }
func (r *RedisClient) HandleSignalInfo(ctx context.Context, operationID string, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{} req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil { if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, err return false, utils.Wrap1(err)
} }
var inviteeUserIDs []string var inviteeUserIDList []string
var isInviteSignal bool var isInviteSignal bool
switch signalInfo := req.Payload.(type) { switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite: case *pbRtc.SignalReq_Invite:
inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup: case *pbRtc.SignalReq_InviteInGroup:
inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true isInviteSignal = true
if !utils.IsContain(pushToUserID, inviteeUserIDs) { if !utils.Contain(pushToUserID, inviteeUserIDList...) {
return false, nil return false, nil
} }
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, nil return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush"))
default: default:
return false, nil return false, nil
} }
if isInviteSignal { if isInviteSignal {
for _, userID := range inviteeUserIDs { for _, userID := range inviteeUserIDList {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil { if err != nil {
return false, err return false, utils.Wrap1(err)
} }
keyList := signalListCache + userID keyList := SignalListCache + userID
err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err() err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
if err != nil { if err != nil {
return false, err return false, utils.Wrap1(err)
} }
err = r.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
if err != nil { if err != nil {
return false, err return false, utils.Wrap1(err)
} }
key := signalCache + msg.ClientMsgID key := SignalCache + msg.ClientMsgID
err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil { if err != nil {
return false, err return false, utils.Wrap1(err)
} }
} }
} }
return true, nil return true, nil
} }
func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key := signalCache + clientMsgID bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes()
invitationInfo = &pbRtc.SignalInviteReq{}
bytes, err := r.rdb.Get(context.Background(), key).Bytes()
if err != nil { if err != nil {
return nil, err return nil, utils.Wrap1(err)
} }
req := &pbRtc.SignalReq{} req := &pbRtc.SignalReq{}
if err = proto.Unmarshal(bytes, req); err != nil { if err = proto.Unmarshal(bytes, req); err != nil {
return nil, err return nil, utils.Wrap1(err)
} }
invitationInfo = &pbRtc.SignalInviteReq{}
switch req2 := req.Payload.(type) { switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite: case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation invitationInfo.Invitation = req2.Invite.Invitation
@ -392,162 +327,112 @@ func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, c
invitationInfo.Invitation = req2.InviteInGroup.Invitation invitationInfo.Invitation = req2.InviteInGroup.Invitation
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
} }
return invitationInfo, err
}
func (r *RedisClient) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := signalListCache + userID
result := r.rdb.LPop(context.Background(), keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
key, err := result.Result()
if err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(ctx, key)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
err = r.DelUserSignalList(ctx, userID)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
return invitationInfo, nil return invitationInfo, nil
} }
func (r *RedisClient) DelUserSignalList(ctx context.Context, userID string) error { func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := signalListCache + userID key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result()
err := r.rdb.Del(context.Background(), keyList).Err() if err != nil {
return err return nil, utils.Wrap1(err)
}
invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key)
if err != nil {
return nil, err
}
return invitationInfo, m.DelUserSignalList(ctx, userID)
} }
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []int64, operationID string) { func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error {
return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err())
}
func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error {
for _, seq := range seqList { for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq)) key := messageCache + userID + "_" + strconv.Itoa(int(seq))
result, err := r.rdb.Get(context.Background(), key).Result() result, err := m.rdb.Get(ctx, key).Result()
if err != nil { if err != nil {
if err == redis.Nil { if err == redis.Nil {
} else { continue
} }
continue return utils.Wrap1(err)
} }
var msg sdkws.MsgData var msg sdkws.MsgData
if err := utils.String2Pb(result, &msg); err != nil { if err := jsonpb.UnmarshalString(result, &msg); err != nil {
continue return err
} }
msg.Status = constant.MsgDeleted msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg) s, err := utils.Pb2String(&msg)
if err != nil { if err != nil {
continue return utils.Wrap1(err)
} }
if err := r.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return utils.Wrap1(err)
} }
} }
return nil
} }
func (r *RedisClient) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err() return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
} }
func (r *RedisClient) GetGetuiToken(ctx context.Context) (string, error) { func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
result, err := r.rdb.Get(context.Background(), getuiToken).Result() return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result())
return result, err
} }
func (r *RedisClient) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err() return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
} }
func (r *RedisClient) GetGetuiTaskID(ctx context.Context) (string, error) { func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
result, err := r.rdb.Get(context.Background(), getuiTaskID).Result() return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result())
return result, err
} }
func (r *RedisClient) SetSendMsgStatus(ctx context.Context, status int32, operationID string) error { func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return r.rdb.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err() return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
} }
func (r *RedisClient) GetSendMsgStatus(ctx context.Context, operationID string) (int, error) { func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := r.rdb.Get(context.Background(), sendMsgFailedFlag+operationID).Result() result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
if err != nil { return int32(result), utils.Wrap1(err)
return 0, err
}
status, err := strconv.Atoi(result)
return status, err
} }
func (r *RedisClient) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID) return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
return r.rdb.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err()
} }
func (r *RedisClient) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID) return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
return r.rdb.Get(context.Background(), key).Result()
}
func (r *RedisClient) DelFcmToken(ctx context.Context, account string, platformID int) error {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Del(context.Background(), key).Err()
}
func (r *RedisClient) IncrUserBadgeUnreadCountSum(ctx context.Context, uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := r.rdb.Incr(context.Background(), key).Result()
return int(seq), err
}
func (r *RedisClient) SetUserBadgeUnreadCountSum(ctx context.Context, uid string, value int) error {
key := userBadgeUnreadCountSum + uid
return r.rdb.Set(context.Background(), key, value, 0).Err()
}
func (r *RedisClient) GetUserBadgeUnreadCountSum(ctx context.Context, uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return utils.StringToInt(seq), err
}
func (r *RedisClient) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
n, err := r.rdb.Exists(context.Background(), key).Result()
return n > 0, err
} }
func (r *RedisClient) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType) return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
return r.rdb.HGetAll(context.Background(), key).Result()
}
func (r *RedisClient) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HDel(context.Background(), key, subKey).Err()
}
func (r *RedisClient) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.Expire(context.Background(), key, expiration).Result()
} }
func (r *RedisClient) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType) seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
result, err := r.rdb.HGet(context.Background(), key, typeKey).Result() return int(seq), utils.Wrap1(err)
return result, err
} }
func (r *RedisClient) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType) return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
return r.rdb.HSet(context.Background(), key, typeKey, value).Err()
} }
func (r *RedisClient) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
}
func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err() return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err())
} }
func (r *RedisClient) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.Del(context.Background(), key).Err() return utils.Wrap1(m.rdb.Del(ctx, key).Err())
} }
func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType { switch sessionType {
case constant.SingleChatType: case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID return "EX_SINGLE_" + clientMsgID
@ -560,3 +445,31 @@ func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType
} }
return "" return ""
} }
func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
}
return n > 0, nil
}
func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
}
func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
}
func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
}
func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}

View File

@ -1,11 +1,11 @@
package controller package controller
import ( import (
"OpenIM/internal/tx"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/cache"
"OpenIM/pkg/common/db/relation" "OpenIM/pkg/common/db/relation"
relationTb "OpenIM/pkg/common/db/table/relation" relationTb "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"encoding/json" "encoding/json"

View File

@ -1,9 +1,9 @@
package controller package controller
import ( import (
"OpenIM/internal/tx"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/table/relation" "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"errors" "errors"

View File

@ -1,11 +1,11 @@
package controller package controller
import ( import (
"OpenIM/internal/tx"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/cache"
relationTb "OpenIM/pkg/common/db/table/relation" relationTb "OpenIM/pkg/common/db/table/relation"
unRelationTb "OpenIM/pkg/common/db/table/unrelation" unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"fmt" "fmt"

View File

@ -50,6 +50,8 @@ type MsgDatabase interface {
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache // 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
// 设置用户最小seq 直接调用cache // 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
@ -79,7 +81,7 @@ func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase {
type msgDatabase struct { type msgDatabase struct {
mgo unRelationTb.MsgDocModelInterface mgo unRelationTb.MsgDocModelInterface
cache cache.MsgCache cache cache.Cache
msg unRelationTb.MsgDocModel msg unRelationTb.MsgDocModel
ExtendMsg unRelationTb.ExtendMsgSetModelInterface ExtendMsg unRelationTb.ExtendMsgSetModelInterface
rdb redis.Client rdb redis.Client
@ -685,3 +687,7 @@ func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID s
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq) return db.cache.SetUserMinSeq(ctx, userID, minSeq)
} }
func (db *msgDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
}

View File

@ -10,7 +10,7 @@ type PushInterface interface {
} }
type PushDataBase struct { type PushDataBase struct {
cache cache.MsgCache cache cache.Cache
} }
func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error { func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error {