mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-26 21:22:16 +08:00
redis msg cache
This commit is contained in:
parent
9d64781fc7
commit
1219b47c79
@ -63,7 +63,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
|
|||||||
log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data))
|
log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data))
|
||||||
var role int32
|
var role int32
|
||||||
if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) {
|
if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) {
|
||||||
switch msgs[0].SessionType {
|
sessionType := msgs[0].SessionType
|
||||||
|
switch sessionType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil {
|
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -89,7 +90,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
|
|||||||
role = member.RoleLevel
|
role = member.RoleLevel
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported")
|
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported", "sessionType", sessionType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
now := time.Now().UnixMilli()
|
now := time.Now().UnixMilli()
|
||||||
|
|||||||
2
pkg/common/storage/cache/msg.go
vendored
2
pkg/common/storage/cache/msg.go
vendored
@ -25,5 +25,5 @@ type MsgCache interface {
|
|||||||
|
|
||||||
GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
|
GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
|
||||||
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
|
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
|
||||||
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error
|
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error
|
||||||
}
|
}
|
||||||
|
|||||||
6
pkg/common/storage/cache/redis/msg.go
vendored
6
pkg/common/storage/cache/redis/msg.go
vendored
@ -79,16 +79,16 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error {
|
func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error {
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
if msg == nil || msg.Seq <= 0 {
|
if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
data, err := json.Marshal(msg)
|
data, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Seq), string(data), msgCacheTimeout); err != nil {
|
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -260,6 +260,9 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) {
|
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) {
|
||||||
|
if msg == nil || msg.Msg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if msg.IsRead {
|
if msg.IsRead {
|
||||||
msg.Msg.IsRead = true
|
msg.Msg.IsRead = true
|
||||||
}
|
}
|
||||||
|
|||||||
@ -252,7 +252,12 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
|
|||||||
userSeqMap[m.SendID] = m.Seq
|
userSeqMap[m.SendID] = m.Seq
|
||||||
seqs = append(seqs, m.Seq)
|
seqs = append(seqs, m.Seq)
|
||||||
}
|
}
|
||||||
if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, convert.MsgPb2DB)); err != nil {
|
msgToDB := func(msg *sdkws.MsgData) *model.MsgInfoModel {
|
||||||
|
return &model.MsgInfoModel{
|
||||||
|
Msg: convert.MsgPb2DB(msg),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, msgToDB)); err != nil {
|
||||||
return 0, false, nil, err
|
return 0, false, nil, err
|
||||||
}
|
}
|
||||||
return lastMaxSeq, isNew, userSeqMap, nil
|
return lastMaxSeq, isNew, userSeqMap, nil
|
||||||
|
|||||||
@ -24,7 +24,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Msg interface {
|
type Msg interface {
|
||||||
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error
|
//PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error
|
||||||
Create(ctx context.Context, model *model.MsgDocModel) error
|
Create(ctx context.Context, model *model.MsgDocModel) error
|
||||||
UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
|
UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
|
||||||
PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
|
PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user