mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
# Conflicts: # internal/rpc/msg/delete.go # pkg/common/db/controller/msg.go # pkg/common/db/table/unrelation/extend_msg_set.go
This commit is contained in:
commit
be11da75ae
2
go.mod
2
go.mod
@ -1,6 +1,6 @@
|
|||||||
module OpenIM
|
module OpenIM
|
||||||
|
|
||||||
go 1.16
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
firebase.google.com/go v3.13.0+incompatible
|
firebase.google.com/go v3.13.0+incompatible
|
||||||
|
@ -24,7 +24,7 @@ type ModifyMsgConsumerHandler struct {
|
|||||||
modifyMsgConsumerGroup *kfk.MConsumerGroup
|
modifyMsgConsumerGroup *kfk.MConsumerGroup
|
||||||
|
|
||||||
extendMsgInterface controller.ExtendMsgInterface
|
extendMsgInterface controller.ExtendMsgInterface
|
||||||
cache cache.Cache
|
cache cache.MsgCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mmc *ModifyMsgConsumerHandler) Init() {
|
func (mmc *ModifyMsgConsumerHandler) Init() {
|
||||||
|
@ -19,10 +19,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan
|
|||||||
|
|
||||||
type Fcm struct {
|
type Fcm struct {
|
||||||
fcmMsgCli *messaging.Client
|
fcmMsgCli *messaging.Client
|
||||||
cache cache.Cache
|
cache cache.MsgCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(cache cache.Cache) *Fcm {
|
func NewClient(cache cache.MsgCache) *Fcm {
|
||||||
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
|
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
|
||||||
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
|
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Test_Push(t *testing.T) {
|
func Test_Push(t *testing.T) {
|
||||||
var redis cache.Cache
|
var redis cache.MsgCache
|
||||||
offlinePusher := NewClient(redis)
|
offlinePusher := NewClient(redis)
|
||||||
err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{})
|
err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
@ -38,12 +38,12 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
cache cache.Cache
|
cache cache.MsgCache
|
||||||
tokenExpireTime int64
|
tokenExpireTime int64
|
||||||
taskIDTTL int64
|
taskIDTTL int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(cache cache.Cache) *Client {
|
func NewClient(cache cache.MsgCache) *Client {
|
||||||
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
|
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ type RPCServer struct {
|
|||||||
pusher Pusher
|
pusher Pusher
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RPCServer) Init(rpcPort int, cache cache.Cache) {
|
func (r *RPCServer) Init(rpcPort int, cache cache.MsgCache) {
|
||||||
r.rpcPort = rpcPort
|
r.rpcPort = rpcPort
|
||||||
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName
|
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Pusher struct {
|
type Pusher struct {
|
||||||
cache cache.Cache
|
cache cache.MsgCache
|
||||||
client discoveryregistry.SvcDiscoveryRegistry
|
client discoveryregistry.SvcDiscoveryRegistry
|
||||||
offlinePusher OfflinePusher
|
offlinePusher OfflinePusher
|
||||||
groupLocalCache localcache.GroupLocalCache
|
groupLocalCache localcache.GroupLocalCache
|
||||||
@ -33,7 +33,7 @@ type Pusher struct {
|
|||||||
successCount int
|
successCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher {
|
func NewPusher(cache cache.MsgCache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher {
|
||||||
return &Pusher{
|
return &Pusher{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
client: client,
|
client: client,
|
||||||
|
475
pkg/common/db/cache/msg.go
vendored
Normal file
475
pkg/common/db/cache/msg.go
vendored
Normal file
@ -0,0 +1,475 @@
|
|||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/pkg/common/config"
|
||||||
|
"Open_IM/pkg/common/constant"
|
||||||
|
"Open_IM/pkg/common/tracelog"
|
||||||
|
pbChat "Open_IM/pkg/proto/msg"
|
||||||
|
pbRtc "Open_IM/pkg/proto/rtc"
|
||||||
|
"Open_IM/pkg/proto/sdkws"
|
||||||
|
"Open_IM/pkg/utils"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/golang/protobuf/jsonpb"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
|
||||||
|
appleDeviceToken = "DEVICE_TOKEN"
|
||||||
|
userMinSeq = "REDIS_USER_MIN_SEQ:"
|
||||||
|
|
||||||
|
getuiToken = "GETUI_TOKEN"
|
||||||
|
getuiTaskID = "GETUI_TASK_ID"
|
||||||
|
messageCache = "MESSAGE_CACHE:"
|
||||||
|
signalCache = "SIGNAL_CACHE:"
|
||||||
|
signalListCache = "SIGNAL_LIST_CACHE:"
|
||||||
|
FcmToken = "FCM_TOKEN:"
|
||||||
|
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
|
||||||
|
groupMaxSeq = "GROUP_MAX_SEQ:"
|
||||||
|
groupMinSeq = "GROUP_MIN_SEQ:"
|
||||||
|
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
||||||
|
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
|
||||||
|
exTypeKeyLocker = "EX_LOCK:"
|
||||||
|
|
||||||
|
uidPidToken = "UID_PID_TOKEN_STATUS:"
|
||||||
|
|
||||||
|
SignalListCache = "SIGNAL_LIST_CACHE:"
|
||||||
|
|
||||||
|
SignalCache = "SIGNAL_CACHE:"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MsgCache interface {
|
||||||
|
IncrUserSeq(ctx context.Context, userID string) (int64, error)
|
||||||
|
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
|
||||||
|
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
|
||||||
|
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
||||||
|
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
|
||||||
|
|
||||||
|
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
||||||
|
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
|
||||||
|
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
|
||||||
|
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
||||||
|
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
|
||||||
|
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
|
||||||
|
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
|
||||||
|
|
||||||
|
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
|
||||||
|
|
||||||
|
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
|
||||||
|
|
||||||
|
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
||||||
|
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
||||||
|
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
||||||
|
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
||||||
|
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
||||||
|
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
||||||
|
HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
|
||||||
|
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
|
||||||
|
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
|
||||||
|
DelUserSignalList(ctx context.Context, userID string) error
|
||||||
|
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
|
||||||
|
|
||||||
|
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
|
||||||
|
GetGetuiToken(ctx context.Context) (string, error)
|
||||||
|
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
|
||||||
|
GetGetuiTaskID(ctx context.Context) (string, error)
|
||||||
|
|
||||||
|
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||||
|
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||||
|
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
|
||||||
|
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
|
||||||
|
DelFcmToken(ctx context.Context, account string, platformID int) error
|
||||||
|
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
|
||||||
|
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
|
||||||
|
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
|
||||||
|
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
|
||||||
|
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
|
||||||
|
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
|
||||||
|
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
|
||||||
|
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
|
||||||
|
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
|
||||||
|
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
|
||||||
|
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMsgCache(client redis.UniversalClient) MsgCache {
|
||||||
|
return &msgCache{rdb: client}
|
||||||
|
}
|
||||||
|
|
||||||
|
type msgCache struct {
|
||||||
|
rdb redis.UniversalClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
|
||||||
|
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
|
||||||
|
key := groupMaxSeq + groupID
|
||||||
|
seq, err := m.rdb.Incr(ctx, key).Uint64()
|
||||||
|
return int64(seq), utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
|
||||||
|
key := groupMaxSeq + groupID
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
|
||||||
|
key := groupMinSeq + groupID
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
||||||
|
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
||||||
|
return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
|
||||||
|
key := uidPidToken + userID + ":" + platformID
|
||||||
|
m, err := m.rdb.HGetAll(ctx, key).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
mm := make(map[string]int)
|
||||||
|
for k, v := range m {
|
||||||
|
mm[k] = utils.StringToInt(v)
|
||||||
|
}
|
||||||
|
return mm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
|
||||||
|
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
||||||
|
mm := make(map[string]interface{})
|
||||||
|
for k, v := range m {
|
||||||
|
mm[k] = v
|
||||||
|
}
|
||||||
|
return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
|
||||||
|
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
||||||
|
return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
|
||||||
|
var errResult error
|
||||||
|
for _, v := range seqList {
|
||||||
|
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
|
||||||
|
key := messageCache + userID + "_" + strconv.Itoa(int(v))
|
||||||
|
result, err := m.rdb.Get(ctx, key).Result()
|
||||||
|
if err != nil {
|
||||||
|
errResult = err
|
||||||
|
failedSeqList = append(failedSeqList, v)
|
||||||
|
} else {
|
||||||
|
msg := sdkws.MsgData{}
|
||||||
|
err = jsonpb.UnmarshalString(result, &msg)
|
||||||
|
if err != nil {
|
||||||
|
errResult = err
|
||||||
|
failedSeqList = append(failedSeqList, v)
|
||||||
|
} else {
|
||||||
|
seqMsg = append(seqMsg, &msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return seqMsg, failedSeqList, errResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) {
|
||||||
|
pipe := m.rdb.Pipeline()
|
||||||
|
var failedList []pbChat.MsgDataToMQ
|
||||||
|
for _, msg := range msgList {
|
||||||
|
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
|
||||||
|
s, err := utils.Pb2String(msg.MsgData)
|
||||||
|
if err != nil {
|
||||||
|
return 0, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
|
||||||
|
if err != nil {
|
||||||
|
return 0, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(failedList) != 0 {
|
||||||
|
return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx)))
|
||||||
|
}
|
||||||
|
_, err := pipe.Exec(ctx)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error {
|
||||||
|
for _, msg := range msgList {
|
||||||
|
if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil {
|
||||||
|
return utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
|
||||||
|
key := messageCache + userID + "_" + "*"
|
||||||
|
vals, err := m.rdb.Keys(ctx, key).Result()
|
||||||
|
if err == redis.Nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
for _, v := range vals {
|
||||||
|
if err := m.rdb.Del(ctx, v).Err(); err != nil {
|
||||||
|
return utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
|
||||||
|
req := &pbRtc.SignalReq{}
|
||||||
|
if err := proto.Unmarshal(msg.Content, req); err != nil {
|
||||||
|
return false, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
var inviteeUserIDList []string
|
||||||
|
var isInviteSignal bool
|
||||||
|
switch signalInfo := req.Payload.(type) {
|
||||||
|
case *pbRtc.SignalReq_Invite:
|
||||||
|
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
|
||||||
|
isInviteSignal = true
|
||||||
|
case *pbRtc.SignalReq_InviteInGroup:
|
||||||
|
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
|
||||||
|
isInviteSignal = true
|
||||||
|
if !utils.Contain(pushToUserID, inviteeUserIDList...) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
|
||||||
|
return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush"))
|
||||||
|
default:
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if isInviteSignal {
|
||||||
|
for _, userID := range inviteeUserIDList {
|
||||||
|
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return false, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
keyList := SignalListCache + userID
|
||||||
|
err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
|
||||||
|
if err != nil {
|
||||||
|
return false, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
|
||||||
|
if err != nil {
|
||||||
|
return false, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
key := SignalCache + msg.ClientMsgID
|
||||||
|
err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
|
||||||
|
if err != nil {
|
||||||
|
return false, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
|
||||||
|
bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
req := &pbRtc.SignalReq{}
|
||||||
|
if err = proto.Unmarshal(bytes, req); err != nil {
|
||||||
|
return nil, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
invitationInfo = &pbRtc.SignalInviteReq{}
|
||||||
|
switch req2 := req.Payload.(type) {
|
||||||
|
case *pbRtc.SignalReq_Invite:
|
||||||
|
invitationInfo.Invitation = req2.Invite.Invitation
|
||||||
|
invitationInfo.OpUserID = req2.Invite.OpUserID
|
||||||
|
case *pbRtc.SignalReq_InviteInGroup:
|
||||||
|
invitationInfo.Invitation = req2.InviteInGroup.Invitation
|
||||||
|
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
|
||||||
|
}
|
||||||
|
return invitationInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
|
||||||
|
key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return invitationInfo, m.DelUserSignalList(ctx, userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error {
|
||||||
|
return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error {
|
||||||
|
for _, seq := range seqList {
|
||||||
|
key := messageCache + userID + "_" + strconv.Itoa(int(seq))
|
||||||
|
result, err := m.rdb.Get(ctx, key).Result()
|
||||||
|
if err != nil {
|
||||||
|
if err == redis.Nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
var msg sdkws.MsgData
|
||||||
|
if err := jsonpb.UnmarshalString(result, &msg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
msg.Status = constant.MsgDeleted
|
||||||
|
s, err := utils.Pb2String(&msg)
|
||||||
|
if err != nil {
|
||||||
|
return utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
|
||||||
|
return utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
|
||||||
|
result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
|
||||||
|
return int32(result), utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
|
||||||
|
return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
||||||
|
seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
|
||||||
|
return int(seq), utils.Wrap1(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
|
||||||
|
return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
|
||||||
|
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
||||||
|
return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
|
||||||
|
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
||||||
|
return utils.Wrap1(m.rdb.Del(ctx, key).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
|
||||||
|
switch sessionType {
|
||||||
|
case constant.SingleChatType:
|
||||||
|
return "EX_SINGLE_" + clientMsgID
|
||||||
|
case constant.GroupChatType:
|
||||||
|
return "EX_GROUP_" + clientMsgID
|
||||||
|
case constant.SuperGroupChatType:
|
||||||
|
return "EX_SUPER_GROUP_" + clientMsgID
|
||||||
|
case constant.NotificationChatType:
|
||||||
|
return "EX_NOTIFICATION" + clientMsgID
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
|
||||||
|
n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
|
||||||
|
if err != nil {
|
||||||
|
return false, utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
return n > 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
|
||||||
|
return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
|
||||||
|
return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
|
||||||
|
return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
|
||||||
|
return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
|
||||||
|
return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
|
||||||
|
}
|
2
pkg/common/db/cache/redis.go
vendored
2
pkg/common/db/cache/redis.go
vendored
@ -59,7 +59,7 @@ type Cache interface {
|
|||||||
|
|
||||||
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
||||||
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
||||||
GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
||||||
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
||||||
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
||||||
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
||||||
|
@ -1,20 +1,21 @@
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"OpenIM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"OpenIM/pkg/common/db/cache"
|
"Open_IM/pkg/common/db/cache"
|
||||||
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
|
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
|
||||||
"OpenIM/pkg/common/db/unrelation"
|
"Open_IM/pkg/common/db/unrelation"
|
||||||
"OpenIM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
"OpenIM/pkg/common/prome"
|
"Open_IM/pkg/common/prome"
|
||||||
"OpenIM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
|
"fmt"
|
||||||
"github.com/gogo/protobuf/sortkeys"
|
"github.com/gogo/protobuf/sortkeys"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pbMsg "OpenIM/pkg/proto/msg"
|
pbMsg "Open_IM/pkg/proto/msg"
|
||||||
"OpenIM/pkg/proto/sdkws"
|
"Open_IM/pkg/proto/sdkws"
|
||||||
"OpenIM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
@ -23,98 +24,7 @@ import (
|
|||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
//type MsgInterface interface {
|
type MsgDatabaseInterface interface {
|
||||||
// // 批量插入消息到db
|
|
||||||
// BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
|
|
||||||
// // 刪除redis中消息缓存
|
|
||||||
// DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
|
|
||||||
// // incrSeq然后批量插入缓存
|
|
||||||
// BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
|
|
||||||
// // 删除消息 返回不存在的seqList
|
|
||||||
// DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
|
|
||||||
// // 通过seqList获取db中写扩散消息
|
|
||||||
// GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
|
||||||
// // 通过seqList获取大群在db里面的消息 没找到返回错误
|
|
||||||
// GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
|
||||||
// // 删除用户所有消息/cache/db然后重置seq
|
|
||||||
// CleanUpUserMsg(ctx context.Context, userID string) error
|
|
||||||
// // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
|
|
||||||
// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
|
|
||||||
// // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
|
||||||
// DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
|
|
||||||
// // 获取用户 seq mongo和redis
|
|
||||||
// GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
|
||||||
// // 获取群 seq mongo和redis
|
|
||||||
// GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
|
|
||||||
// // 设置群用户最小seq 直接调用cache
|
|
||||||
// SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
|
||||||
// // 设置用户最小seq 直接调用cache
|
|
||||||
// SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
|
||||||
//
|
|
||||||
// MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
|
|
||||||
// return &MsgController{}
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type MsgController struct {
|
|
||||||
// database MsgDatabase
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
|
|
||||||
// return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
|
|
||||||
// return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
|
|
||||||
// return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
|
|
||||||
// return m.database.DelMsgBySeqs(ctx, userID, seqs)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
|
||||||
// return m.database.GetMsgBySeqs(ctx, userID, seqs)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
|
||||||
// return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error {
|
|
||||||
// return m.database.CleanUpUserMsg(ctx, userID)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
|
|
||||||
// return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
|
||||||
// return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
|
||||||
// return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
|
|
||||||
// return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
|
|
||||||
// return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
|
|
||||||
// return m.database.SetUserMinSeq(ctx, userID, minSeq)
|
|
||||||
//}
|
|
||||||
|
|
||||||
type MsgDatabase interface {
|
|
||||||
// 批量插入消息
|
// 批量插入消息
|
||||||
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
|
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
|
||||||
// 刪除redis中消息缓存
|
// 刪除redis中消息缓存
|
||||||
@ -140,12 +50,11 @@ type MsgDatabase interface {
|
|||||||
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
|
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
|
||||||
// 设置群用户最小seq 直接调用cache
|
// 设置群用户最小seq 直接调用cache
|
||||||
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
||||||
//
|
|
||||||
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
|
|
||||||
// 设置用户最小seq 直接调用cache
|
// 设置用户最小seq 直接调用cache
|
||||||
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
||||||
|
|
||||||
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
|
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
|
||||||
|
|
||||||
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
|
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
|
||||||
|
|
||||||
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
|
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
|
||||||
@ -155,105 +64,135 @@ type MsgDatabase interface {
|
|||||||
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
|
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
|
||||||
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
|
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
|
||||||
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
|
||||||
SetSendMsgStatus(ctx context.Context, userID string, status int32) error
|
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||||
GetSendMsgStatus(ctx context.Context, userID string) (int32, error)
|
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||||
MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error
|
MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error
|
||||||
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
|
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
|
||||||
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
|
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
|
||||||
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
|
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
|
||||||
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
||||||
}
|
GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error)
|
||||||
type msgDatabase struct {
|
|
||||||
mgo unRelationTb.MsgDocModelInterface
|
|
||||||
cache cache.Cache
|
|
||||||
msg unRelationTb.MsgDocModel
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
|
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
|
||||||
|
return &MsgDatabase{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type MsgDatabase struct {
|
||||||
|
mgo unRelationTb.MsgDocModelInterface
|
||||||
|
cache cache.MsgCache
|
||||||
|
msg unRelationTb.MsgDocModel
|
||||||
|
ExtendMsg unRelationTb.ExtendMsgSetModelInterface
|
||||||
|
rdb redis.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel {
|
||||||
|
r := make(map[string]unRelationTb.KeyValueModel)
|
||||||
|
for key, value := range reactionExtensionList {
|
||||||
|
r[key] = unRelationTb.KeyValueModel{
|
||||||
|
TypeKey: value.TypeKey,
|
||||||
|
Value: value.Value,
|
||||||
|
LatestUpdateTime: value.LatestUpdateTime,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
|
||||||
|
return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
|
||||||
|
return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
|
||||||
|
return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
|
||||||
|
return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
|
||||||
|
return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
|
||||||
|
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
||||||
|
return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
|
||||||
|
extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
|
||||||
|
}
|
||||||
|
reactionExtensionList := make(map[string]*pbMsg.KeyValueResp)
|
||||||
|
for key, model := range extendMsg.ReactionExtensionList {
|
||||||
|
reactionExtensionList[key] = &pbMsg.KeyValueResp{
|
||||||
|
KeyValue: &sdkws.KeyValue{
|
||||||
|
TypeKey: model.TypeKey,
|
||||||
|
Value: model.Value,
|
||||||
|
LatestUpdateTime: model.LatestUpdateTime,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &pbMsg.ExtendMsg{
|
||||||
|
ReactionExtensionList: reactionExtensionList,
|
||||||
|
ClientMsgID: extendMsg.ClientMsgID,
|
||||||
|
MsgFirstModifyTime: extendMsg.MsgFirstModifyTime,
|
||||||
|
AttachedInfo: extendMsg.AttachedInfo,
|
||||||
|
Ex: extendMsg.Ex,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
||||||
|
return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
|
||||||
|
return db.cache.SetSendMsgStatus(ctx, id, status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
|
||||||
|
return db.cache.GetSendMsgStatus(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
|
||||||
//TODO implement me
|
//TODO implement me
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
|
func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
|
||||||
//TODO implement me
|
return db.cache.GetUserMaxSeq(ctx, userID)
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
|
func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
|
||||||
//TODO implement me
|
return db.cache.GetUserMinSeq(ctx, userID)
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
|
func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
|
||||||
//TODO implement me
|
return db.cache.GetGroupMaxSeq(ctx, groupID)
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
|
||||||
//TODO implement me
|
return db.cache.GetGroupMinSeq(ctx, groupID)
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
|
func (db *MsgDatabase) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) {
|
||||||
//TODO implement me
|
seqMsg, _, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
|
||||||
panic("implement me")
|
return seqMsg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
|
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase {
|
|
||||||
return &msgDatabase{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
|
|
||||||
//newTime := utils.GetCurrentTimestampByMill()
|
//newTime := utils.GetCurrentTimestampByMill()
|
||||||
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
|
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
|
||||||
return errors.New("too large")
|
return errors.New("too large")
|
||||||
@ -337,11 +276,11 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
|
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
|
||||||
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
|
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
|
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
|
||||||
//newTime := utils.GetCurrentTimestampByMill()
|
//newTime := utils.GetCurrentTimestampByMill()
|
||||||
lenList := len(msgList)
|
lenList := len(msgList)
|
||||||
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
|
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
|
||||||
@ -393,7 +332,7 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
|
|||||||
return lastMaxSeq, utils.Wrap(err, "")
|
return lastMaxSeq, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
|
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
|
||||||
sortkeys.Int64s(seqs)
|
sortkeys.Int64s(seqs)
|
||||||
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
|
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
@ -414,7 +353,7 @@ func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i
|
|||||||
return totalUnExistSeqs, nil
|
return totalUnExistSeqs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
|
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
|
||||||
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
|
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -427,7 +366,7 @@ func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s
|
|||||||
return unExistSeqs, nil
|
return unExistSeqs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
|
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
|
||||||
doc, err := db.mgo.FindOneByDocID(ctx, docID)
|
doc, err := db.mgo.FindOneByDocID(ctx, docID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
@ -458,7 +397,7 @@ func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
|
|||||||
return seqMsgs, indexes, unExistSeqs, nil
|
return seqMsgs, indexes, unExistSeqs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
||||||
msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
|
msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -466,7 +405,7 @@ func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
|
|||||||
return db.unmarshalMsg(msgInfo)
|
return db.unmarshalMsg(msgInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
||||||
msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
|
msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -474,7 +413,7 @@ func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb
|
|||||||
return db.unmarshalMsg(msgInfo)
|
return db.unmarshalMsg(msgInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
|
func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
|
||||||
msgPb = &sdkws.MsgData{}
|
msgPb = &sdkws.MsgData{}
|
||||||
err = proto.Unmarshal(msgInfo.Msg, msgPb)
|
err = proto.Unmarshal(msgInfo.Msg, msgPb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -483,7 +422,7 @@ func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
|
|||||||
return msgPb, nil
|
return msgPb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
|
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
var hasSeqs []int64
|
var hasSeqs []int64
|
||||||
singleCount := 0
|
singleCount := 0
|
||||||
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
|
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
|
||||||
@ -524,8 +463,8 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
|
|||||||
return seqMsg, nil
|
return seqMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
|
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != redis.Nil {
|
if err != redis.Nil {
|
||||||
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||||
@ -545,8 +484,8 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i
|
|||||||
return successMsgs, nil
|
return successMsgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
|
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != redis.Nil {
|
if err != redis.Nil {
|
||||||
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||||
@ -566,7 +505,7 @@ func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin
|
|||||||
return successMsgs, nil
|
return successMsgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
|
func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
|
||||||
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
|
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -575,7 +514,7 @@ func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error
|
|||||||
return utils.Wrap(err, "")
|
return utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
|
func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
|
||||||
var delStruct delMsgRecursionStruct
|
var delStruct delMsgRecursionStruct
|
||||||
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
|
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -603,7 +542,7 @@ func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
||||||
var delStruct delMsgRecursionStruct
|
var delStruct delMsgRecursionStruct
|
||||||
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
|
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -629,7 +568,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
|
|||||||
// seq 70
|
// seq 70
|
||||||
// set minSeq 21
|
// set minSeq 21
|
||||||
// recursion 删除list并且返回设置的最小seq
|
// recursion 删除list并且返回设置的最小seq
|
||||||
func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
|
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
|
||||||
// find from oldest list
|
// find from oldest list
|
||||||
msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
|
msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
|
||||||
if err != nil || msgs.DocID == "" {
|
if err != nil || msgs.DocID == "" {
|
||||||
@ -691,10 +630,10 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
|
|||||||
//log.NewDebug(operationID, sourceID, "continue to", delStruct)
|
//log.NewDebug(operationID, sourceID, "continue to", delStruct)
|
||||||
// 继续递归 index+1
|
// 继续递归 index+1
|
||||||
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
|
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
|
||||||
return seq, err
|
return seq, utils.Wrap(err, "deleteMsg failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
||||||
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
|
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, 0, 0, err
|
return 0, 0, 0, 0, err
|
||||||
@ -711,7 +650,7 @@ func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
|
func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
|
||||||
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
|
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, 0, err
|
return 0, 0, 0, err
|
||||||
@ -723,7 +662,7 @@ func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||||
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
|
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
@ -745,10 +684,10 @@ func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
|
func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
|
||||||
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
|
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
|
func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
|
||||||
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
|
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ type PushInterface interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type PushDataBase struct {
|
type PushDataBase struct {
|
||||||
cache cache.Cache
|
cache cache.MsgCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error {
|
func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error {
|
||||||
|
@ -21,6 +21,27 @@ func CopyStructFields(a interface{}, b interface{}, fields ...string) (err error
|
|||||||
return copier.Copy(a, b)
|
return copier.Copy(a, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Wrap1(err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return Wrap(err, "")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Wrap2[T any](a T, err error) (T, error) {
|
||||||
|
if err != nil {
|
||||||
|
return a, Wrap(err, "")
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Wrap3[T any, V any](a T, b V, err error) (T, V, error) {
|
||||||
|
if err != nil {
|
||||||
|
return a, b, Wrap(err, "")
|
||||||
|
}
|
||||||
|
return a, b, nil
|
||||||
|
}
|
||||||
|
|
||||||
func Wrap(err error, message string) error {
|
func Wrap(err error, message string) error {
|
||||||
return errors.Wrap(err, "==> "+printCallerNameAndLine()+message)
|
return errors.Wrap(err, "==> "+printCallerNameAndLine()+message)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user