mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-26 11:36:44 +08:00
msg
This commit is contained in:
parent
b4443c843a
commit
eac2619c33
@ -14,107 +14,50 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
|
func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
|
||||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
resp = &msg.SetMessageReactionExtensionsResp{}
|
||||||
var rResp msg.SetMessageReactionExtensionsResp
|
//resp.ClientMsgID = req.ClientMsgID
|
||||||
rResp.ClientMsgID = req.ClientMsgID
|
//resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||||
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
|
||||||
callbackResp := notification.callbackSetMessageReactionExtensions(req)
|
if err := CallbackSetMessageReactionExtensions(ctx, req); err != nil {
|
||||||
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
return nil, err
|
||||||
rResp.ErrCode = int32(callbackResp.ErrCode)
|
|
||||||
rResp.ErrMsg = callbackResp.ErrMsg
|
|
||||||
for _, value := range req.ReactionExtensionList {
|
|
||||||
temp := new(msg.KeyValueResp)
|
|
||||||
temp.KeyValue = value
|
|
||||||
temp.ErrMsg = callbackResp.ErrMsg
|
|
||||||
temp.ErrCode = 100
|
|
||||||
rResp.Result = append(rResp.Result, temp)
|
|
||||||
}
|
|
||||||
return &rResp, nil
|
|
||||||
}
|
}
|
||||||
//if ExternalExtension
|
//if ExternalExtension
|
||||||
if req.IsExternalExtensions {
|
if req.IsExternalExtensions {
|
||||||
var isHistory bool
|
resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||||
if req.IsReact {
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false)
|
||||||
isHistory = false
|
return resp, nil
|
||||||
} else {
|
|
||||||
isHistory = true
|
|
||||||
}
|
|
||||||
rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime
|
|
||||||
rResp.Result = callbackResp.ResultReactionExtensionList
|
|
||||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false)
|
|
||||||
return &rResp, nil
|
|
||||||
}
|
}
|
||||||
for _, v := range callbackResp.ResultReactionExtensionList {
|
isExists, err := m.MsgInterface.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType)
|
||||||
if v.ErrCode == 0 {
|
|
||||||
req.ReactionExtensionList[v.KeyValue.TypeKey] = v.KeyValue
|
|
||||||
} else {
|
|
||||||
delete(req.ReactionExtensionList, v.KeyValue.TypeKey)
|
|
||||||
rResp.Result = append(rResp.Result, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rResp.ErrCode = 100
|
return nil, err
|
||||||
rResp.ErrMsg = err.Error()
|
|
||||||
for _, value := range req.ReactionExtensionList {
|
|
||||||
temp := new(msg.KeyValueResp)
|
|
||||||
temp.KeyValue = value
|
|
||||||
temp.ErrMsg = err.Error()
|
|
||||||
temp.ErrCode = 100
|
|
||||||
rResp.Result = append(rResp.Result, temp)
|
|
||||||
}
|
|
||||||
return &rResp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isExists {
|
if !isExists {
|
||||||
if !req.IsReact {
|
if !req.IsReact {
|
||||||
log.Debug(req.OperationID, "redis handle firstly", req.String())
|
resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||||
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
|
||||||
for k, v := range req.ReactionExtensionList {
|
for k, v := range req.ReactionExtensionList {
|
||||||
err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
return nil, err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||||
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
if err := m.MsgInterface.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil {
|
||||||
if newerr != nil {
|
return nil, err
|
||||||
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
|
|
||||||
}
|
}
|
||||||
rResp.IsReact = true
|
resp.IsReact = true
|
||||||
_, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
|
_, err := m.MsgInterface.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
|
err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rResp.ErrCode = 100
|
return nil, err
|
||||||
rResp.ErrMsg = err.Error()
|
|
||||||
for _, value := range req.ReactionExtensionList {
|
|
||||||
temp := new(msg.KeyValueResp)
|
|
||||||
temp.KeyValue = value
|
|
||||||
temp.ErrMsg = err.Error()
|
|
||||||
temp.ErrCode = 100
|
|
||||||
rResp.Result = append(rResp.Result, temp)
|
|
||||||
}
|
|
||||||
return &rResp, nil
|
|
||||||
}
|
}
|
||||||
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
mongoValue, err := m.MsgInterface.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rResp.ErrCode = 200
|
return nil, err
|
||||||
rResp.ErrMsg = err.Error()
|
|
||||||
for _, value := range req.ReactionExtensionList {
|
|
||||||
temp := new(msg.KeyValueResp)
|
|
||||||
temp.KeyValue = value
|
|
||||||
temp.ErrMsg = err.Error()
|
|
||||||
temp.ErrCode = 100
|
|
||||||
rResp.Result = append(rResp.Result, temp)
|
|
||||||
}
|
|
||||||
return &rResp, nil
|
|
||||||
}
|
}
|
||||||
setValue := make(map[string]*sdkws.KeyValue)
|
setValue := make(map[string]*sdkws.KeyValue)
|
||||||
for k, v := range req.ReactionExtensionList {
|
for k, v := range req.ReactionExtensionList {
|
||||||
@ -123,7 +66,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
|||||||
if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
|
if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
|
||||||
utils.CopyStructFields(temp, &vv)
|
utils.CopyStructFields(temp, &vv)
|
||||||
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||||
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
|
setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,13 +82,13 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
|||||||
temp.KeyValue = value
|
temp.KeyValue = value
|
||||||
temp.ErrMsg = err.Error()
|
temp.ErrMsg = err.Error()
|
||||||
temp.ErrCode = 100
|
temp.ErrCode = 100
|
||||||
rResp.Result = append(rResp.Result, temp)
|
resp.Result = append(resp.Result, temp)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for _, value := range setValue {
|
for _, value := range setValue {
|
||||||
temp := new(msg.KeyValueResp)
|
temp := new(msg.KeyValueResp)
|
||||||
temp.KeyValue = value
|
temp.KeyValue = value
|
||||||
rResp.Result = append(rResp.Result, temp)
|
resp.Result = append(resp.Result, temp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||||
@ -160,42 +103,42 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
|||||||
for k, v := range req.ReactionExtensionList {
|
for k, v := range req.ReactionExtensionList {
|
||||||
err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
||||||
if err != nil && err != go_redis.Nil {
|
if err != nil && err != go_redis.Nil {
|
||||||
setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
|
setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
temp := new(sdkws.KeyValue)
|
temp := new(sdkws.KeyValue)
|
||||||
utils.JsonStringToStruct(redisValue, temp)
|
utils.JsonStringToStruct(redisValue, temp)
|
||||||
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||||
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
|
setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||||
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
||||||
if newerr != nil {
|
if newerr != nil {
|
||||||
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
|
setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !isExists {
|
if !isExists {
|
||||||
if !req.IsReact {
|
if !req.IsReact {
|
||||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true)
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, true, true)
|
||||||
} else {
|
} else {
|
||||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, false)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true)
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, true)
|
||||||
}
|
}
|
||||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String())
|
||||||
return &rResp, nil
|
return &resp, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||||
|
@ -27,7 +27,11 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe
|
|||||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackBeforeSetMessageReactionExtResp{}
|
resp := &cbapi.CallbackBeforeSetMessageReactionExtResp{}
|
||||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
if err := http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
setReq.MsgFirstModifyTime = resp.MsgFirstModifyTime
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) error {
|
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) error {
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
package msg
|
package msg
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const GlOBLLOCK = "GLOBAL_LOCK"
|
const GlOBLLOCK = "GLOBAL_LOCK"
|
||||||
|
|
||||||
type MessageLocker interface {
|
type MessageLocker interface {
|
||||||
LockMessageTypeKey(clientMsgID, typeKey string) (err error)
|
LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error)
|
||||||
UnLockMessageTypeKey(clientMsgID string, typeKey string) error
|
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error
|
||||||
LockGlobalMessage(clientMsgID string) (err error)
|
LockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
|
||||||
UnLockGlobalMessage(clientMsgID string) (err error)
|
UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
|
||||||
}
|
}
|
||||||
type LockerMessage struct{}
|
type LockerMessage struct{}
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
|
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
|
||||||
|
resp = &msg.SendMsgResp{}
|
||||||
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
|
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
|
||||||
// callback
|
// callback
|
||||||
if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil {
|
if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil {
|
||||||
|
@ -16,13 +16,14 @@ import (
|
|||||||
|
|
||||||
type msgServer struct {
|
type msgServer struct {
|
||||||
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||||
MsgInterface controller.MsgInterface
|
MsgInterface controller.MsgDatabaseInterface
|
||||||
Group *check.GroupChecker
|
Group *check.GroupChecker
|
||||||
User *check.UserCheck
|
User *check.UserCheck
|
||||||
Conversation *check.ConversationChecker
|
Conversation *check.ConversationChecker
|
||||||
friend *check.FriendChecker
|
friend *check.FriendChecker
|
||||||
*localcache.GroupLocalCache
|
*localcache.GroupLocalCache
|
||||||
black *check.BlackChecker
|
black *check.BlackChecker
|
||||||
|
MessageLocker MessageLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteMsg struct {
|
type deleteMsg struct {
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"Open_IM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
"github.com/gogo/protobuf/sortkeys"
|
"github.com/gogo/protobuf/sortkeys"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
pbMsg "Open_IM/pkg/proto/msg"
|
pbMsg "Open_IM/pkg/proto/msg"
|
||||||
"Open_IM/pkg/proto/sdkws"
|
"Open_IM/pkg/proto/sdkws"
|
||||||
@ -22,96 +23,96 @@ import (
|
|||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MsgInterface interface {
|
//type MsgInterface interface {
|
||||||
// 批量插入消息到db
|
// // 批量插入消息到db
|
||||||
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
|
// BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
|
||||||
// 刪除redis中消息缓存
|
// // 刪除redis中消息缓存
|
||||||
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
|
// DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
|
||||||
// incrSeq然后批量插入缓存
|
// // incrSeq然后批量插入缓存
|
||||||
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
|
// BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
|
||||||
// 删除消息 返回不存在的seqList
|
// // 删除消息 返回不存在的seqList
|
||||||
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
|
// DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
|
||||||
// 通过seqList获取db中写扩散消息
|
// // 通过seqList获取db中写扩散消息
|
||||||
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
// GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
||||||
// 通过seqList获取大群在db里面的消息 没找到返回错误
|
// // 通过seqList获取大群在db里面的消息 没找到返回错误
|
||||||
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
// GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
||||||
// 删除用户所有消息/cache/db然后重置seq
|
// // 删除用户所有消息/cache/db然后重置seq
|
||||||
CleanUpUserMsg(ctx context.Context, userID string) error
|
// CleanUpUserMsg(ctx context.Context, userID string) error
|
||||||
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
|
// // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
|
||||||
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
|
// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
|
||||||
// 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
// // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
||||||
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
|
// DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
|
||||||
// 获取用户 seq mongo和redis
|
// // 获取用户 seq mongo和redis
|
||||||
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
// GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||||
// 获取群 seq mongo和redis
|
// // 获取群 seq mongo和redis
|
||||||
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
|
// GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
|
||||||
// 设置群用户最小seq 直接调用cache
|
// // 设置群用户最小seq 直接调用cache
|
||||||
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
// SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
||||||
// 设置用户最小seq 直接调用cache
|
// // 设置用户最小seq 直接调用cache
|
||||||
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
// SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
||||||
|
//
|
||||||
MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error)
|
// MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
|
//func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
|
||||||
return &MsgController{}
|
// return &MsgController{}
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
type MsgController struct {
|
//type MsgController struct {
|
||||||
database MsgDatabase
|
// database MsgDatabase
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
|
//func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
|
||||||
return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
|
// return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
|
//func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
|
||||||
return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
|
// return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
|
//func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
|
||||||
return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
|
// return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
|
//func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
|
||||||
return m.database.DelMsgBySeqs(ctx, userID, seqs)
|
// return m.database.DelMsgBySeqs(ctx, userID, seqs)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
//func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
return m.database.GetMsgBySeqs(ctx, userID, seqs)
|
// return m.database.GetMsgBySeqs(ctx, userID, seqs)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
//func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
|
// return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error {
|
//func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error {
|
||||||
return m.database.CleanUpUserMsg(ctx, userID)
|
// return m.database.CleanUpUserMsg(ctx, userID)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
|
//func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
|
||||||
return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
|
// return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
//func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
|
||||||
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
|
// return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
//func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
|
||||||
return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
|
// return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
|
//func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
|
||||||
return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
|
// return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
|
//func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
|
||||||
return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
|
// return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
|
//func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
|
||||||
return m.database.SetUserMinSeq(ctx, userID, minSeq)
|
// return m.database.SetUserMinSeq(ctx, userID, minSeq)
|
||||||
}
|
//}
|
||||||
|
|
||||||
type MsgDatabaseInterface interface {
|
type MsgDatabaseInterface interface {
|
||||||
// 批量插入消息
|
// 批量插入消息
|
||||||
@ -141,8 +142,14 @@ type MsgDatabaseInterface interface {
|
|||||||
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
|
||||||
// 设置用户最小seq 直接调用cache
|
// 设置用户最小seq 直接调用cache
|
||||||
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
|
||||||
}
|
|
||||||
|
|
||||||
|
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
|
||||||
|
|
||||||
|
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
|
||||||
|
|
||||||
|
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
|
||||||
|
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error)
|
||||||
|
}
|
||||||
type MsgDatabase struct {
|
type MsgDatabase struct {
|
||||||
mgo unRelationTb.MsgDocModelInterface
|
mgo unRelationTb.MsgDocModelInterface
|
||||||
cache cache.Cache
|
cache cache.Cache
|
||||||
|
Loading…
x
Reference in New Issue
Block a user