mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-26 19:46:57 +08:00
merge
This commit is contained in:
parent
8793271541
commit
4f01e10f65
@ -62,15 +62,15 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debug(msgFromMQ.TriggerID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
log.Debug(msgFromMQ.TriggerID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||||
for _, msgDataToMQ := range msgFromMQ.Messages {
|
for _, msg := range msgFromMQ.Messages {
|
||||||
isReactionFromCache := utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsReactionFromCache)
|
isReactionFromCache := utils.GetSwitchFromOptions(msg.Options, constant.IsReactionFromCache)
|
||||||
if !isReactionFromCache {
|
if !isReactionFromCache {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ctx = mcontext.SetOperationID(ctx, operationID)
|
ctx = mcontext.SetOperationID(ctx, operationID)
|
||||||
if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageModifier {
|
if msg.ContentType == constant.ReactionMessageModifier {
|
||||||
notification := &sdkws.ReactionMessageModifierNotification{}
|
notification := &sdkws.ReactionMessageModifierNotification{}
|
||||||
if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil {
|
if err := json.Unmarshal(msg.Content, notification); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if notification.IsExternalExtensions {
|
if notification.IsExternalExtensions {
|
||||||
@ -102,9 +102,9 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama
|
|||||||
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageDeleter {
|
} else if msg.ContentType == constant.ReactionMessageDeleter {
|
||||||
notification := &sdkws.ReactionMessageDeleteNotification{}
|
notification := &sdkws.ReactionMessageDeleteNotification{}
|
||||||
if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil {
|
if err := json.Unmarshal(msg.Content, notification); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
|
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
|
||||||
|
@ -5,9 +5,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
|
||||||
"github.com/go-redis/redis"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||||
@ -16,11 +13,11 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||||
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
|
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
|
||||||
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/go-redis/redis"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -103,7 +100,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
|
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
|
||||||
isStorage := func(msg *sdkws.MsgData) bool {
|
isStorage := func(msg *sdkws.MsgData) bool {
|
||||||
options2 := utils.Options(msg.Options)
|
options2 := utils.Options(msg.Options)
|
||||||
if options2.IsHistory() {
|
if options2.IsHistory() {
|
||||||
@ -120,10 +117,10 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID str
|
|||||||
options := utils.Options(v.message.Options)
|
options := utils.Options(v.message.Options)
|
||||||
if options.IsNotification() {
|
if options.IsNotification() {
|
||||||
// 原通知
|
// 原通知
|
||||||
notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ)
|
notificationMsg := proto.Clone(v.message).(*sdkws.MsgData)
|
||||||
if options.IsSendMsg() {
|
if options.IsSendMsg() {
|
||||||
// 消息
|
// 消息
|
||||||
v.message.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false), utils.WithSendMsg(false))
|
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false))
|
||||||
storageMsgList = append(storageMsgList, v.message)
|
storageMsgList = append(storageMsgList, v.message)
|
||||||
}
|
}
|
||||||
if isStorage(notificationMsg) {
|
if isStorage(notificationMsg) {
|
||||||
@ -145,7 +142,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID str
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
||||||
och.toPushTopic(ctx, conversationID, notStorageList)
|
och.toPushTopic(ctx, conversationID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, conversationID, storageList)
|
lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, conversationID, storageList)
|
||||||
@ -159,22 +156,22 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) {
|
||||||
for _, v := range msgs {
|
for _, v := range msgs {
|
||||||
och.msgDatabase.MsgToPushMQ(ctx, conversationID, v)
|
och.msgDatabase.MsgToPushMQ(ctx, conversationID, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
||||||
och.toPushTopic(ctx, conversationID, notStorageList)
|
och.toPushTopic(ctx, conversationID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
var currentMaxSeq int64
|
var currentMaxSeq int64
|
||||||
var err error
|
var err error
|
||||||
if storageList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
if storageList[0].SessionType == constant.SuperGroupChatType {
|
||||||
currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, conversationID)
|
currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, conversationID)
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID)
|
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID)
|
||||||
if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
if err := och.GroupChatFirstCreateConversation(ctx, storageList[0]); err != nil {
|
||||||
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -182,7 +179,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con
|
|||||||
currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, conversationID)
|
currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, conversationID)
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
log.ZInfo(ctx, "single chat first create conversation", "conversationID", conversationID)
|
log.ZInfo(ctx, "single chat first create conversation", "conversationID", conversationID)
|
||||||
if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
if err := och.SingleChatFirstCreateConversation(ctx, storageList[0]); err != nil {
|
||||||
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -274,7 +274,6 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
|
||||||
switch singleOpt {
|
switch singleOpt {
|
||||||
case constant.ReceiveMessage:
|
case constant.ReceiveMessage:
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -311,30 +310,29 @@ func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq {
|
|||||||
return &msg.SendMsgReq{MsgData: &msgData}
|
return &msg.SendMsgReq{MsgData: &msgData}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, wg *sync.WaitGroup) error {
|
func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, req *msg.SendMsgReq, wg *sync.WaitGroup) error {
|
||||||
msgToMQGroup := msg.MsgDataToMQ{MsgData: groupPB.MsgData}
|
|
||||||
tempOptions := make(map[string]bool, 1)
|
tempOptions := make(map[string]bool, 1)
|
||||||
for k, v := range groupPB.MsgData.Options {
|
for k, v := range req.MsgData.Options {
|
||||||
tempOptions[k] = v
|
tempOptions[k] = v
|
||||||
}
|
}
|
||||||
for _, v := range list {
|
for _, v := range list {
|
||||||
groupPB.MsgData.RecvID = v
|
req.MsgData.RecvID = v
|
||||||
options := make(map[string]bool, 1)
|
options := make(map[string]bool, 1)
|
||||||
for k, v := range tempOptions {
|
for k, v := range tempOptions {
|
||||||
options[k] = v
|
options[k] = v
|
||||||
}
|
}
|
||||||
groupPB.MsgData.Options = options
|
req.MsgData.Options = options
|
||||||
conversationID := utils.GetConversationIDBySessionType(constant.GroupChatType, groupPB.MsgData.GroupID)
|
conversationID := utils.GetConversationIDBySessionType(constant.GroupChatType, req.MsgData.GroupID)
|
||||||
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, v, conversationID, constant.GroupChatType, groupPB)
|
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, v, conversationID, constant.GroupChatType, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if isSend {
|
if isSend {
|
||||||
if v == "" || groupPB.MsgData.SendID == "" {
|
if v == "" || req.MsgData.SendID == "" {
|
||||||
return errs.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty")
|
return errs.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty")
|
||||||
}
|
}
|
||||||
err := m.MsgDatabase.MsgToMQ(ctx, v, &msgToMQGroup)
|
err := m.MsgDatabase.MsgToMQ(ctx, v, req.MsgData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
return err
|
return err
|
||||||
|
@ -21,8 +21,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
|
|||||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
|
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
|
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData)
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -32,27 +31,26 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
|
|||||||
}
|
}
|
||||||
|
|
||||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
|
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
|
||||||
resp.SendTime = msgToMQSingle.MsgData.SendTime
|
resp.SendTime = req.MsgData.SendTime
|
||||||
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
|
resp.ServerMsgID = req.MsgData.ServerMsgID
|
||||||
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
|
resp.ClientMsgID = req.MsgData.ClientMsgID
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
|
func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
|
||||||
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
|
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData)
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
|
if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle)
|
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp = &msg.SendMsgResp{
|
resp = &msg.SendMsgResp{
|
||||||
ServerMsgID: msgToMQSingle.MsgData.ServerMsgID,
|
ServerMsgID: req.MsgData.ServerMsgID,
|
||||||
ClientMsgID: msgToMQSingle.MsgData.ClientMsgID,
|
ClientMsgID: req.MsgData.ClientMsgID,
|
||||||
SendTime: msgToMQSingle.MsgData.SendTime,
|
SendTime: req.MsgData.SendTime,
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@ -67,15 +65,14 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
|
|
||||||
if isSend {
|
if isSend {
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle)
|
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.ErrInternalServer.Wrap("insert to mq")
|
return nil, errs.ErrInternalServer.Wrap("insert to mq")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
|
if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle)
|
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.ErrInternalServer.Wrap("insert to mq")
|
return nil, errs.ErrInternalServer.Wrap("insert to mq")
|
||||||
}
|
}
|
||||||
@ -86,9 +83,9 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
|
|||||||
}
|
}
|
||||||
promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter)
|
promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter)
|
||||||
resp = &msg.SendMsgResp{
|
resp = &msg.SendMsgResp{
|
||||||
ServerMsgID: msgToMQSingle.MsgData.ServerMsgID,
|
ServerMsgID: req.MsgData.ServerMsgID,
|
||||||
ClientMsgID: msgToMQSingle.MsgData.ClientMsgID,
|
ClientMsgID: req.MsgData.ClientMsgID,
|
||||||
SendTime: msgToMQSingle.MsgData.SendTime,
|
SendTime: req.MsgData.SendTime,
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
6
pkg/common/db/cache/msg.go
vendored
6
pkg/common/db/cache/msg.go
vendored
@ -457,15 +457,15 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
|
func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
|
||||||
return errs.Wrap(c.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
|
return errs.Wrap(c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
|
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
|
||||||
return utils.Wrap2(c.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
|
return utils.Wrap2(c.rdb.Get(ctx, fcmToken+account+":"+strconv.Itoa(platformID)).Result())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
|
func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
|
||||||
return errs.Wrap(c.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
|
return errs.Wrap(c.rdb.Del(ctx, fcmToken+account+":"+strconv.Itoa(platformID)).Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
||||||
|
@ -34,7 +34,7 @@ type MsgDatabase interface {
|
|||||||
// 刪除redis中消息缓存
|
// 刪除redis中消息缓存
|
||||||
DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error
|
DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error
|
||||||
// incrSeq然后批量插入缓存
|
// incrSeq然后批量插入缓存
|
||||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int64, error)
|
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) (int64, error)
|
||||||
// incrSeq通知seq然后批量插入缓存
|
// incrSeq通知seq然后批量插入缓存
|
||||||
NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error)
|
NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error)
|
||||||
// 删除消息 返回不存在的seqList
|
// 删除消息 返回不存在的seqList
|
||||||
@ -190,7 +190,7 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.Ms
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error {
|
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error {
|
||||||
if len(messages) > 0 {
|
if len(messages) > 0 {
|
||||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: conversationID, Messages: messages})
|
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: conversationID, Messages: messages})
|
||||||
return err
|
return err
|
||||||
@ -207,9 +207,9 @@ func (db *msgDatabase) MsgToPushMQ(ctx context.Context, conversationID string, m
|
|||||||
return partition, offset, err
|
return partition, offset, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
|
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||||
if len(messages) > 0 {
|
if len(messages) > 0 {
|
||||||
_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, SourceID: sourceID, Messages: messages})
|
_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: sourceID, MsgData: messages})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -198,11 +198,11 @@ func (db *notificationDatabase) MsgToModifyMQ(ctx context.Context, aggregationID
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *notificationDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
func (db *notificationDatabase) MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||||
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq, SourceID: key}
|
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}
|
||||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &mqPushMsg)
|
partition, offset, err := db.producerToPush.SendMessage(ctx, conversationID, &mqPushMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
|
log.ZError(ctx, "MsgToPushMQ", err, "conversationID", conversationID, "msg2mq", msg2mq)
|
||||||
}
|
}
|
||||||
return partition, offset, err
|
return partition, offset, err
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ func (m NotificationDocModel) superGroupIndexGen(groupID string, seqSuffix int64
|
|||||||
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
|
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m NotificationDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
|
func (m NotificationDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
||||||
t := make(map[string][]int64)
|
t := make(map[string][]int64)
|
||||||
for i := 0; i < len(seqs); i++ {
|
for i := 0; i < len(seqs); i++ {
|
||||||
docID := m.GetDocID(conversationID, seqs[i])
|
docID := m.GetDocID(conversationID, seqs[i])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user