Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
Gordon 2023-05-25 20:57:46 +08:00
commit f34996f6e3
21 changed files with 2057 additions and 2119 deletions

View File

@ -119,18 +119,6 @@ func (m *Message) PullMsgBySeqs(c *gin.Context) {
a2r.Call(msg.MsgClient.PullMessageBySeqs, m.client, c) a2r.Call(msg.MsgClient.PullMessageBySeqs, m.client, c)
} }
func (m *Message) DelMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.DelMsgs, m.client, c)
}
func (m *Message) DelSuperGroupMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.DelSuperGroupMsg, m.client, c)
}
func (m *Message) ClearMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.ClearMsg, m.client, c)
}
func (m *Message) RevokeMsg(c *gin.Context) { func (m *Message) RevokeMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.RevokeMsg, m.client, c) a2r.Call(msg.MsgClient.RevokeMsg, m.client, c)
} }

View File

@ -139,9 +139,7 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry, rdb redis.Universal
msgGroup.POST("/newest_seq", m.GetSeq) msgGroup.POST("/newest_seq", m.GetSeq)
msgGroup.POST("/send_msg", m.SendMessage) msgGroup.POST("/send_msg", m.SendMessage)
msgGroup.POST("/pull_msg_by_seq", m.PullMsgBySeqs) msgGroup.POST("/pull_msg_by_seq", m.PullMsgBySeqs)
msgGroup.POST("/del_msg", m.DelMsg) // todo del msg route
msgGroup.POST("/del_super_group_msg", m.DelSuperGroupMsg)
msgGroup.POST("/clear_msg", m.ClearMsg)
msgGroup.POST("/revoke_msg", m.RevokeMsg) msgGroup.POST("/revoke_msg", m.RevokeMsg)
msgGroup.POST("/batch_send_msg", m.ManagementBatchSendMsg) msgGroup.POST("/batch_send_msg", m.ManagementBatchSendMsg)

View File

@ -2,9 +2,6 @@ package msgtransfer
import ( import (
"context" "context"
"encoding/json"
"errors"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
@ -66,36 +63,36 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs)
continue continue
} }
case constant.MsgRevokeNotification: //case constant.MsgRevokeNotification:
var elem sdkws.NotificationElem // var elem sdkws.NotificationElem
if err := json.Unmarshal(v.Content, &elem); err != nil { // if err := json.Unmarshal(v.Content, &elem); err != nil {
log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content)) // log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content))
continue // continue
} // }
var tips sdkws.RevokeMsgTips // var tips sdkws.RevokeMsgTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { // if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content)) // log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content))
continue // continue
} // }
msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq}) // msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq})
if err != nil { // if err != nil {
log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq) // log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
continue // continue
} // }
if len(msgs) == 0 { // if len(msgs) == 0 {
log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq) // log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq)
continue // continue
} // }
msgs[0].Content = []byte(elem.Detail) // msgs[0].Content = []byte(elem.Detail)
data, err := proto.Marshal(msgs[0]) // data, err := proto.Marshal(msgs[0])
if err != nil { // if err != nil {
log.ZError(ctx, "proto.Marshal MsgData", err) // log.ZError(ctx, "proto.Marshal MsgData", err)
continue // continue
} // }
if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil { // if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil {
log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq) // log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
continue // continue
} // }
} }
} }
} }

View File

@ -1217,23 +1217,32 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
return nil, errs.ErrArgs.Wrap("invalid role level") return nil, errs.ErrArgs.Wrap("invalid role level")
} }
} }
opMember, ok := memberMap[[...]string{member.GroupID, opUserID}]
if !ok {
return nil, errs.ErrArgs.Wrap(fmt.Sprintf("user %s not in group %s", opUserID, member.GroupID))
}
if member.UserID == opUserID { if member.UserID == opUserID {
if member.RoleLevel != nil { if member.RoleLevel != nil {
return nil, errs.ErrNoPermission.Wrap("can not change self role level") return nil, errs.ErrNoPermission.Wrap("can not change self role level")
} }
continue continue
} }
opMember, ok := memberMap[[...]string{member.GroupID, opUserID}] if opMember.RoleLevel == constant.GroupOrdinaryUsers {
if !ok { return nil, errs.ErrNoPermission.Wrap("ordinary users can not change other role level")
return nil, errs.ErrArgs.Wrap(fmt.Sprintf("user %s not in group %s", opUserID, member.GroupID))
} }
dbMember, ok := memberMap[[...]string{member.GroupID, member.UserID}] dbMember, ok := memberMap[[...]string{member.GroupID, member.UserID}]
if !ok { if !ok {
return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("user %s not in group %s", member.UserID, member.GroupID)) return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("user %s not in group %s", member.UserID, member.GroupID))
} }
if opMember.RoleLevel == constant.GroupOrdinaryUsers { //if opMember.RoleLevel == constant.GroupOwner {
return nil, errs.ErrNoPermission.Wrap("ordinary users can not change other role level") // continue
} //}
//if dbMember.RoleLevel == constant.GroupOwner {
// return nil, errs.ErrNoPermission.Wrap("change group owner")
//}
//if opMember.RoleLevel == constant.GroupAdmin && dbMember.RoleLevel == constant.GroupAdmin {
// return nil, errs.ErrNoPermission.Wrap("admin can not change other admin role info")
//}
switch opMember.RoleLevel { switch opMember.RoleLevel {
case constant.GroupOrdinaryUsers: case constant.GroupOrdinaryUsers:
return nil, errs.ErrNoPermission.Wrap("ordinary users can not change other role level") return nil, errs.ErrNoPermission.Wrap("ordinary users can not change other role level")
@ -1241,6 +1250,9 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
if dbMember.RoleLevel != constant.GroupOrdinaryUsers { if dbMember.RoleLevel != constant.GroupOrdinaryUsers {
return nil, errs.ErrNoPermission.Wrap("admin can not change other role level") return nil, errs.ErrNoPermission.Wrap("admin can not change other role level")
} }
if member.RoleLevel != nil {
return nil, errs.ErrNoPermission.Wrap("admin can not change other role level")
}
case constant.GroupOwner: case constant.GroupOwner:
//if member.RoleLevel != nil && member.RoleLevel.Value == constant.GroupOwner { //if member.RoleLevel != nil && member.RoleLevel.Value == constant.GroupOwner {
// return nil, errs.ErrNoPermission.Wrap("owner only one") // return nil, errs.ErrNoPermission.Wrap("owner only one")

View File

@ -38,7 +38,7 @@ func toCommonCallback(ctx context.Context, msg *pbChat.SendMsgReq, command strin
} }
} }
func CallbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable {
return nil return nil
} }
@ -50,7 +50,7 @@ func CallbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) er
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg) return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg)
} }
func CallbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
return nil return nil
} }
@ -62,7 +62,7 @@ func CallbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) err
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg) return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg)
} }
func CallbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
return nil return nil
} }
@ -74,7 +74,7 @@ func CallbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg) return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg)
} }
func CallbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return nil return nil
} }
@ -86,7 +86,7 @@ func CallbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) erro
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
} }
func CallbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text {
return nil return nil
} }

View File

@ -3,30 +3,34 @@ package msg
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
) )
func (m *msgServer) DelMsgs(ctx context.Context, req *msg.DelMsgsReq) (*msg.DelMsgsResp, error) { func (m *msgServer) getMinSeqs(maxSeqs map[string]int64) map[string]int64 {
// if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.Seqs); err != nil { minSeqs := make(map[string]int64)
// return nil, err for k, v := range maxSeqs {
// } minSeqs[k] = v + 1
return &msg.DelMsgsResp{}, nil }
return minSeqs
} }
func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroupMsgReq) (*msg.DelSuperGroupMsgResp, error) { func (m *msgServer) ClearConversationsMsg(ctx context.Context, req *msg.ClearConversationsMsgReq) (*msg.ClearConversationsMsgResp, error) {
resp := &msg.DelSuperGroupMsgResp{} if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err return nil, err
} }
if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, req.GroupID, 0); err != nil { maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, req.ConversationIDs)
if err != nil {
return nil, err return nil, err
} }
return resp, nil if err := m.MsgDatabase.SetUserConversationsMinSeqs(ctx, req.UserID, m.getMinSeqs(maxSeqs)); err != nil {
return nil, err
}
return &msg.ClearConversationsMsgResp{}, nil
} }
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) { func (m *msgServer) UserClearAllMsg(ctx context.Context, req *msg.UserClearAllMsgReq) (*msg.UserClearAllMsgResp, error) {
resp := &msg.ClearMsgResp{}
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
@ -34,6 +38,32 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl
if err != nil { if err != nil {
return nil, err return nil, err
} }
m.MsgDatabase.CleanUpUserConversationsMsgs(ctx, req.UserID, conversationIDs) maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
return resp, nil if err != nil {
return nil, err
}
if err := m.MsgDatabase.SetUserConversationsMinSeqs(ctx, req.UserID, m.getMinSeqs(maxSeqs)); err != nil {
return nil, err
}
return &msg.UserClearAllMsgResp{}, nil
}
func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*msg.DeleteMsgsResp, error) {
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
return &msg.DeleteMsgsResp{}, nil
}
func (m *msgServer) DeleteMsgPhysicalBySeq(ctx context.Context, req *msg.DeleteMsgPhysicalBySeqReq) (*msg.DeleteMsgPhysicalBySeqResp, error) {
return &msg.DeleteMsgPhysicalBySeqResp{}, nil
}
func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhysicalReq) (*msg.DeleteMsgPhysicalResp, error) {
for _, conversationID := range req.ConversationIDs {
if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, req.RemainTime); err != nil {
log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err)
}
}
return &msg.DeleteMsgPhysicalResp{}, nil
} }

View File

@ -11,7 +11,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
) )
func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error { func callbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error {
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return nil return nil
} }
@ -35,7 +35,7 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe
return nil return nil
} }
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error { func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error {
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return nil return nil
} }
@ -54,7 +54,7 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionE
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
} }
func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error { func callbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error {
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return nil return nil
} }
@ -70,7 +70,7 @@ func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.G
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
} }
func CallbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error { func callbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error {
req := &cbapi.CallbackAddMessageReactionExtReq{ req := &cbapi.CallbackAddMessageReactionExtReq{
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand, CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,

View File

@ -2,6 +2,7 @@ package msg
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
@ -30,7 +31,7 @@ func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgDa
return req.MsgData, nil return req.MsgData, nil
} }
func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
log.ZWarn(ctx, "CallbackMsgModify failed", err, "req", req.String()) log.ZWarn(ctx, "CallbackMsgModify failed", err, "req", req.String())
return nil, err return nil, err
} }
@ -39,18 +40,13 @@ func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.Msg
func MessageBeforeSendCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { func MessageBeforeSendCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
switch req.MsgData.SessionType { switch req.MsgData.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
if err := CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { if err := callbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
log.ZWarn(ctx, "CallbackBeforeSendSingleMsg failed", err, "req", req.String()) log.ZWarn(ctx, "CallbackBeforeSendSingleMsg failed", err, "req", req.String())
return nil, err return nil, err
} }
case constant.GroupChatType:
if err := CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String())
return nil, err
}
case constant.NotificationChatType: case constant.NotificationChatType:
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
if err := CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { if err := callbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String()) log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String())
return nil, err return nil, err
} }

View File

@ -5,11 +5,13 @@ import (
"encoding/json" "encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"time"
) )
func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) {
@ -22,9 +24,16 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
if req.RecvID != "" && req.GroupID != "" { if req.RecvID != "" && req.GroupID != "" {
return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time") return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time")
} }
if req.Seq < 0 {
return nil, errs.ErrArgs.Wrap("seq is invalid")
}
if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil { if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil {
return nil, err return nil, err
} }
user, err := m.User.GetUserInfo(ctx, req.UserID)
if err != nil {
return nil, err
}
var sessionType int32 var sessionType int32
var conversationID string var conversationID string
if req.GroupID == "" { if req.GroupID == "" {
@ -34,6 +43,46 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
sessionType = constant.SuperGroupChatType sessionType = constant.SuperGroupChatType
conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID) conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID)
} }
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, conversationID, []int64{req.Seq})
if err != nil {
return nil, err
}
if len(msgs) == 0 {
return nil, errs.ErrRecordNotFound.Wrap("msg not found")
}
sendID := msgs[0].SendID
if !tokenverify.IsAppManagerUid(ctx) {
if req.GroupID == "" {
if req.UserID != sendID {
return nil, errs.ErrNoPermission.Wrap("no permission")
}
} else {
members, err := m.Group.GetGroupMemberInfoMap(ctx, req.GroupID, utils.Distinct([]string{req.UserID, sendID}), true)
if err != nil {
return nil, err
}
if req.UserID != sendID {
roleLevel := members[req.UserID].RoleLevel
switch members[req.UserID].RoleLevel {
case constant.GroupOwner:
case constant.GroupAdmin:
if roleLevel != constant.GroupOrdinaryUsers {
return nil, errs.ErrNoPermission.Wrap("no permission")
}
default:
return nil, errs.ErrNoPermission.Wrap("no permission")
}
}
}
}
err = m.MsgDatabase.RevokeMsg(ctx, conversationID, req.Seq, &unRelationTb.RevokeModel{
UserID: req.UserID,
Nickname: user.Nickname,
Time: time.Now().UnixMilli(),
})
if err != nil {
return nil, err
}
tips := sdkws.RevokeMsgTips{ tips := sdkws.RevokeMsgTips{
RevokerUserID: req.UserID, RevokerUserID: req.UserID,
ClientMsgID: "", ClientMsgID: "",

View File

@ -12,6 +12,28 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
) )
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
resp = &msg.SendMsgResp{}
flag := isMessageHasReadEnabled(req.MsgData)
if !flag {
return nil, errs.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req)
case constant.SuperGroupChatType:
return m.sendMsgSuperGroupChat(ctx, req)
default:
return nil, errs.ErrArgs.Wrap("unknown sessionType")
}
}
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) { func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) {
resp = &pbMsg.SendMsgResp{} resp = &pbMsg.SendMsgResp{}
promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
@ -23,7 +45,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = CallbackAfterSendGroupMsg(ctx, req); err != nil { if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
log.ZError(ctx, "CallbackAfterSendGroupMsg", err) log.ZError(ctx, "CallbackAfterSendGroupMsg", err)
} }
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
@ -69,7 +91,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq
promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter) promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter)
return nil, err return nil, err
} }
err = CallbackAfterSendSingleMsg(ctx, req) err = callbackAfterSendSingleMsg(ctx, req)
if err != nil && err != errs.ErrCallbackContinue { if err != nil && err != errs.ErrCallbackContinue {
return nil, err return nil, err
} }

View File

@ -3,21 +3,15 @@ package msg
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -102,75 +96,3 @@ func (m *msgServer) initPrometheus() {
prome.NewWorkSuperGroupChatMsgProcessSuccessCounter() prome.NewWorkSuperGroupChatMsgProcessSuccessCounter()
prome.NewWorkSuperGroupChatMsgProcessFailedCounter() prome.NewWorkSuperGroupChatMsgProcessFailedCounter()
} }
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
resp = &msg.SendMsgResp{}
flag := isMessageHasReadEnabled(req.MsgData)
if !flag {
return nil, errs.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req)
case constant.SuperGroupChatType:
return m.sendMsgSuperGroupChat(ctx, req)
default:
return nil, errs.ErrArgs.Wrap("unknown sessionType")
}
}
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs, "maxSeqs", maxSeqs)
return nil, err
}
resp := new(sdkws.GetMaxSeqResp)
resp.MaxSeqs = maxSeqs
return resp, nil
}
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
resp := &sdkws.PullMessageBySeqsResp{}
resp.Msgs = make(map[string]*sdkws.PullMsgs)
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
for _, seq := range req.SeqRanges {
if !utils.IsNotification(seq.ConversationID) {
msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq)
continue
}
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs}
} else {
var seqs []int64
for i := seq.Begin; i <= seq.End; i++ {
seqs = append(seqs, i)
}
notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq)
continue
}
resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
}
}
return resp, nil
}

View File

@ -0,0 +1,60 @@
package msg
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
)
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
resp := &sdkws.PullMessageBySeqsResp{}
resp.Msgs = make(map[string]*sdkws.PullMsgs)
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
for _, seq := range req.SeqRanges {
if !utils.IsNotification(seq.ConversationID) {
msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq)
continue
}
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs}
} else {
var seqs []int64
for i := seq.Begin; i <= seq.End; i++ {
seqs = append(seqs, i)
}
notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq)
continue
}
resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
}
}
return resp, nil
}
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs, "maxSeqs", maxSeqs)
return nil, err
}
resp := new(sdkws.GetMaxSeqResp)
resp.MaxSeqs = maxSeqs
return resp, nil
}

View File

@ -292,20 +292,3 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
} }
return true, nil return true, nil
} }
func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq {
offlinePushInfo := sdkws.OfflinePushInfo{}
if pb.MsgData.OfflinePushInfo != nil {
offlinePushInfo = *pb.MsgData.OfflinePushInfo
}
msgData := sdkws.MsgData{}
msgData = *pb.MsgData
msgData.OfflinePushInfo = &offlinePushInfo
options := make(map[string]bool, 10)
for key, value := range pb.MsgData.Options {
options[key] = value
}
msgData.Options = options
return &msg.SendMsgReq{MsgData: &msgData}
}

View File

@ -52,7 +52,10 @@ type MsgModel interface {
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
// seqs map: key userID value minSeq
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
@ -195,6 +198,18 @@ func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationI
return err return err
} }
func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
pipe := c.rdb.Pipeline()
for conversationID, minSeq := range seqs {
err = pipe.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()
if err != nil {
return errs.Wrap(err)
}
}
_, err = pipe.Exec(ctx)
return err
}
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())

View File

@ -3,6 +3,7 @@ package controller
import ( import (
"fmt" "fmt"
"sort" "sort"
"strconv"
"sync" "sync"
"time" "time"
@ -25,15 +26,13 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"google.golang.org/protobuf/proto"
) )
type CommonMsgDatabase interface { type CommonMsgDatabase interface {
// 批量插入消息 // 批量插入消息
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
// 撤回消息 // 撤回消息
RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error
// 刪除redis中消息缓存 // 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
// incrSeq然后批量插入缓存 // incrSeq然后批量插入缓存
@ -57,6 +56,7 @@ type CommonMsgDatabase interface {
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
@ -141,99 +141,192 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation
return nil return nil
} }
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, msgList []*unRelationTb.MsgInfoModel, firstSeq int64) error {
if len(msgList) == 0 {
return nil
}
num := db.msg.GetSingleGocMsgNum() num := db.msg.GetSingleGocMsgNum()
currentIndex := currentMaxSeq / num //num = 100
var blockMsgs []*[]*sdkws.MsgData if msgList[0].Msg != nil {
for i, data := range msgList { firstSeq = msgList[0].Msg.Seq
data.Seq = currentMaxSeq + int64(i+1) }
index := data.Seq/num - currentIndex getDocID := func(seq int64) string {
if i == 0 && index == 1 { return conversationID + ":" + strconv.FormatInt(seq/num, 10)
index-- }
currentIndex++ getIndex := func(seq int64) int64 {
return seq % num
}
// 返回值为true表示数据库存在该文档false表示数据库不存在该文档
updateMsgModel := func(docID string, index int64, msg *unRelationTb.MsgInfoModel) (bool, error) {
var (
res *mongo.UpdateResult
err error
)
if msg.Msg != nil {
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msg.Msg)
} else if msg.Revoke != nil {
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke)
} else if msg.DelList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList)
} else if msg.ReadList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList)
} else {
return false, errs.ErrArgs.Wrap("msg all field is nil")
} }
var block *[]*sdkws.MsgData if err != nil {
if len(blockMsgs) == int(index) { return false, err
var size int64 }
if i == 0 { return res.MatchedCount > 0, nil
size = num - data.Seq%num }
tryUpdate := true
for i := 0; i < len(msgList); i++ {
msg := msgList[i]
seq := firstSeq + int64(i)
docID := getDocID(seq)
if tryUpdate {
matched, err := updateMsgModel(docID, getIndex(seq), msg)
if err != nil {
return err
}
if matched {
continue
}
}
doc := unRelationTb.MsgDocModel{
DocID: docID,
Msg: make([]*unRelationTb.MsgInfoModel, num),
}
var insert int
for j := i; j < len(msgList); j++ {
seq = firstSeq + int64(j)
if getDocID(seq) != docID {
break
}
insert++
doc.Msg[getIndex(seq)] = msgList[j]
}
for i, model := range doc.Msg {
if model == nil {
doc.Msg[i] = &unRelationTb.MsgInfoModel{
DelList: []string{},
ReadList: []string{},
}
} else { } else {
temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num if model.DelList == nil {
if temp >= num { doc.Msg[i].DelList = []string{}
size = num }
} else { if model.ReadList == nil {
size = temp % num doc.Msg[i].ReadList = []string{}
} }
} }
temp := make([]*sdkws.MsgData, 0, size)
block = &temp
blockMsgs = append(blockMsgs, block)
} else {
block = blockMsgs[index]
} }
*block = append(*block, msgList[i]) if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
} if mongo.IsDuplicateKeyError(err) {
create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0) i--
if !create { tryUpdate = true
exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex)) continue
if err != nil { }
return err return err
} }
create = !exist tryUpdate = false
} i += insert - 1
for i, msgs := range blockMsgs {
docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
if create || i != 0 { // 插入
doc := unRelationTb.MsgDocModel{
DocID: docID,
Msg: make([]unRelationTb.MsgInfoModel, num),
}
for i := 0; i < len(doc.Msg); i++ {
doc.Msg[i].ReadList = []string{}
doc.Msg[i].DelList = []string{}
}
for _, msg := range *msgs {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
SendTime: msg.SendTime,
Msg: data,
ReadList: []string{},
DelList: []string{},
}
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
prome.Inc(prome.MsgInsertMongoFailedCounter)
return utils.Wrap(err, "")
}
prome.Inc(prome.MsgInsertMongoSuccessCounter)
} else { // 修改
for _, msg := range *msgs {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
info := unRelationTb.MsgInfoModel{
SendTime: msg.SendTime,
Msg: data,
}
if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
prome.Inc(prome.MsgInsertMongoFailedCounter)
return err
}
prome.Inc(prome.MsgInsertMongoSuccessCounter)
}
}
} }
return nil return nil
} }
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error { func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
index := seq / db.msg.GetSingleGocMsgNum() //num := db.msg.GetSingleGocMsgNum()
docID := db.msg.IndexDocID(conversationID, index) //currentIndex := currentMaxSeq / num
return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg) //var blockMsgs []*[]*sdkws.MsgData
//for i, data := range msgList {
// data.Seq = currentMaxSeq + int64(i+1)
// index := data.Seq/num - currentIndex
// if i == 0 && index == 1 {
// index--
// currentIndex++
// }
// var block *[]*sdkws.MsgData
// if len(blockMsgs) == int(index) {
// var size int64
// if i == 0 {
// size = num - data.Seq%num
// } else {
// temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num
// if temp >= num {
// size = num
// } else {
// size = temp % num
// }
// }
// temp := make([]*sdkws.MsgData, 0, size)
// block = &temp
// blockMsgs = append(blockMsgs, block)
// } else {
// block = blockMsgs[index]
// }
// *block = append(*block, msgList[i])
//}
//create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0)
//if !create {
// exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex))
// if err != nil {
// return err
// }
// create = !exist
//}
//for i, msgs := range blockMsgs {
// docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
// if create || i != 0 { // 插入
// doc := unRelationTb.MsgDocModel{
// DocID: docID,
// Msg: make([]unRelationTb.MsgInfoModel, num),
// }
// for i := 0; i < len(doc.Msg); i++ {
// doc.Msg[i].ReadList = []string{}
// doc.Msg[i].DelList = []string{}
// }
// for _, msg := range *msgs {
// data, err := proto.Marshal(msg)
// if err != nil {
// return err
// }
// doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
// SendTime: msg.SendTime,
// Msg: data,
// ReadList: []string{},
// DelList: []string{},
// }
// }
// if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
// prome.Inc(prome.MsgInsertMongoFailedCounter)
// return utils.Wrap(err, "")
// }
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
// } else { // 修改
// for _, msg := range *msgs {
// data, err := proto.Marshal(msg)
// if err != nil {
// return err
// }
// info := unRelationTb.MsgInfoModel{
// SendTime: msg.SendTime,
// Msg: data,
// }
// if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
// prome.Inc(prome.MsgInsertMongoFailedCounter)
// return err
// }
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
// }
// }
//}
return nil
}
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error {
msgs := []*unRelationTb.MsgInfoModel{{Revoke: revoke}}
return db.BatchInsertBlock(ctx, conversationID, msgs, seq)
//return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg)
} }
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
@ -330,10 +423,11 @@ func (db *commonMsgDatabase) GetOldestMsg(ctx context.Context, conversationID st
func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
msgPb = &sdkws.MsgData{} msgPb = &sdkws.MsgData{}
err = proto.Unmarshal(msgInfo.Msg, msgPb) // todo: unmarshal
if err != nil { //err = proto.Unmarshal(msgInfo.Msg, msgPb)
return nil, utils.Wrap(err, "") //if err != nil {
} // return nil, utils.Wrap(err, "")
//}
return msgPb, nil return msgPb, nil
} }
@ -566,69 +660,70 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// recursion 删除list并且返回设置的最小seq // recursion 删除list并且返回设置的最小seq
func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list // find from oldest list
msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index) //msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index)
if err != nil || msgs.DocID == "" { //if err != nil || msgs.DocID == "" {
if err != nil { // if err != nil {
if err == unrelation.ErrMsgListNotExist { // if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) // log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else { // } else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) // log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
} // }
} // }
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 // // 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs) // err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs)
if err != nil { // if err != nil {
return 0, err // return 0, err
} // }
return delStruct.getSetMinSeq() + 1, nil // return delStruct.getSetMinSeq() + 1, nil
} //}
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg)) //log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg))
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { //if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) // log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
} //}
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { //if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID) // delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID)
lastMsgPb := &sdkws.MsgData{} // lastMsgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) // err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
if err != nil { // if err != nil {
log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID) // log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID)
return 0, utils.Wrap(err, "proto.Unmarshal failed") // return 0, utils.Wrap(err, "proto.Unmarshal failed")
} // }
delStruct.minSeq = lastMsgPb.Seq // delStruct.minSeq = lastMsgPb.Seq
} else { //} else {
var hasMarkDelFlag bool // var hasMarkDelFlag bool
for i, msg := range msgs.Msg { // for i, msg := range msgs.Msg {
if msg.SendTime != 0 { // if msg.SendTime != 0 {
msgPb := &sdkws.MsgData{} // msgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb) // err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil { // if err != nil {
log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID) // log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
return 0, utils.Wrap(err, "proto.Unmarshal failed") // return 0, utils.Wrap(err, "proto.Unmarshal failed")
} // }
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { // if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
msgPb.Status = constant.MsgDeleted // msgPb.Status = constant.MsgDeleted
bytes, _ := proto.Marshal(msgPb) // bytes, _ := proto.Marshal(msgPb)
msg.Msg = bytes // msg.Msg = bytes
msg.SendTime = 0 // msg.SendTime = 0
hasMarkDelFlag = true // hasMarkDelFlag = true
} else { // } else {
// 到本条消息不需要删除, minSeq置为这条消息的seq // // 到本条消息不需要删除, minSeq置为这条消息的seq
if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil { // if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
return 0, err // return 0, err
} // }
if hasMarkDelFlag { // if hasMarkDelFlag {
if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil { // if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
return delStruct.getSetMinSeq(), err // return delStruct.getSetMinSeq(), err
} // }
} // }
return msgPb.Seq, nil // return msgPb.Seq, nil
} // }
} // }
} // }
} //}
// 继续递归 index+1 //// 继续递归 index+1
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) //seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err //return seq, err
return 0, nil
} }
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {
@ -679,6 +774,10 @@ func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, con
return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs) return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.cache.SetUserConversationsMinSeqs(ctx, userID, seqs)
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status) return db.cache.SetSendMsgStatus(ctx, id, status)
} }

View File

@ -2,13 +2,12 @@ package controller
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"math/rand" "math/rand"
"sort"
"strconv" "strconv"
"sync" "sync"
"testing" "testing"
@ -38,6 +37,7 @@ func Test_BatchInsertChat2DB(t *testing.T) {
db := &commonMsgDatabase{ db := &commonMsgDatabase{
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()), msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()),
} }
//ctx := context.Background() //ctx := context.Background()
//msgs := make([]*sdkws.MsgData, 0, 1) //msgs := make([]*sdkws.MsgData, 0, 1)
//for i := 0; i < cap(msgs); i++ { //for i := 0; i < cap(msgs); i++ {
@ -125,523 +125,110 @@ func Test_BatchInsertChat2DB(t *testing.T) {
} }
func TestName(t *testing.T) { func GetDB() *commonMsgDatabase {
s := ` [ config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
189, config.Config.Mongo.DBTimeout = 60
498, config.Config.Mongo.DBDatabase = "openIM"
310, config.Config.Mongo.DBSource = "admin"
163, config.Config.Mongo.DBUserName = "root"
313, config.Config.Mongo.DBPassword = "openIM123"
335, config.Config.Mongo.DBMaxPoolSize = 100
327, config.Config.Mongo.DBRetainChatRecords = 3650
342, config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
123,
97,
4,
362,
210,
298,
436,
9,
369,
432,
132,
69,
248,
93,
91,
112,
145,
194,
84,
443,
179,
241,
257,
237,
169,
460,
33,
441,
126,
187,
390,
402,
51,
35,
455,
175,
389,
61,
309,
467,
492,
453,
159,
276,
165,
417,
173,
157,
12,
209,
269,
36,
226,
356,
92,
267,
482,
318,
219,
119,
176,
245,
74,
13,
450,
196,
215,
28,
167,
366,
442,
201,
341,
68,
2,
484,
328,
44,
423,
403,
105,
109,
480,
271,
134,
336,
299,
148,
365,
135,
277,
87,
244,
301,
218,
59,
280,
283,
55,
499,
133,
316,
407,
146,
56,
394,
386,
297,
285,
137,
58,
214,
142,
6,
124,
48,
60,
212,
75,
50,
412,
458,
127,
45,
266,
202,
368,
138,
260,
41,
193,
88,
114,
410,
95,
382,
416,
281,
434,
359,
98,
462,
300,
352,
230,
247,
117,
64,
287,
405,
224,
19,
259,
305,
220,
150,
477,
111,
448,
78,
103,
7,
385,
151,
429,
325,
273,
317,
470,
454,
170,
223,
5,
307,
396,
315,
53,
154,
446,
24,
255,
227,
76,
456,
250,
321,
330,
391,
355,
49,
479,
387,
216,
39,
251,
312,
217,
136,
262,
322,
344,
466,
242,
100,
388,
38,
323,
376,
379,
279,
239,
85,
306,
181,
485,
120,
333,
334,
17,
395,
81,
374,
147,
139,
185,
42,
1,
424,
199,
225,
113,
438,
128,
338,
156,
493,
46,
160,
11,
3,
171,
464,
62,
238,
431,
440,
302,
65,
308,
348,
125,
174,
195,
77,
392,
249,
82,
350,
444,
232,
186,
494,
384,
275,
129,
294,
246,
357,
102,
96,
73,
15,
263,
296,
236,
29,
340,
152,
149,
143,
437,
172,
190,
34,
158,
254,
295,
483,
397,
337,
72,
343,
178,
404,
270,
346,
205,
377,
486,
497,
370,
414,
240,
360,
490,
94,
256,
8,
54,
398,
183,
228,
162,
399,
289,
83,
86,
197,
243,
57,
25,
288,
488,
372,
168,
206,
188,
491,
452,
353,
478,
421,
221,
430,
184,
204,
26,
211,
140,
155,
468,
161,
420,
303,
30,
449,
131,
500,
20,
71,
79,
445,
425,
293,
411,
400,
320,
474,
272,
413,
329,
177,
122,
21,
347,
314,
451,
101,
367,
311,
40,
476,
415,
418,
363,
282,
469,
89,
274,
481,
475,
203,
268,
393,
261,
200,
121,
164,
472,
10,
284,
14,
358,
153,
383,
67,
473,
373,
191,
144,
16,
345,
361,
433,
116,
331,
489,
66,
106,
487,
426,
99,
27,
141,
264,
439,
371,
213,
18,
253,
292,
130,
409,
278,
419,
90,
496,
447,
465,
461,
339,
80,
31,
70,
233,
326,
37,
265,
252,
222,
118,
198,
406,
286,
380,
104,
304,
351,
408,
180,
22,
364,
381,
401,
234,
375,
459,
319,
229,
207,
291,
52,
463,
427,
23,
235,
32,
208,
192,
349,
231,
354,
435,
182,
428,
332,
378,
290,
108,
258,
471,
115,
47,
457,
166,
43,
495,
63,
110,
107,
422,
324
]`
var arr []int mongo, err := unrelation.NewMongo()
if err != nil {
panic(err)
}
err = mongo.GetDatabase().Client().Ping(context.Background(), nil)
if err != nil {
panic(err)
}
return &commonMsgDatabase{
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()),
}
}
if err := json.Unmarshal([]byte(s), &arr); err != nil { func Test_Insert(t *testing.T) {
db := GetDB()
ctx := context.Background()
var arr []*unRelationTb.MsgInfoModel
for i := 0; i < 345; i++ {
arr = append(arr, &unRelationTb.MsgInfoModel{
Msg: &unRelationTb.MsgDataModel{
Seq: int64(i),
Content: fmt.Sprintf("test-%d", i),
},
})
}
if err := db.BatchInsertBlock(ctx, "test", arr, 0); err != nil {
t.Fatal(err)
}
}
func Test_Revoke(t *testing.T) {
db := GetDB()
ctx := context.Background()
var arr []*unRelationTb.MsgInfoModel
for i := 0; i < 456; i++ {
arr = append(arr, &unRelationTb.MsgInfoModel{
Revoke: &unRelationTb.RevokeModel{
UserID: "uid_" + strconv.Itoa(i),
Nickname: "uname_" + strconv.Itoa(i),
Time: time.Now().UnixMilli(),
},
})
}
if err := db.BatchInsertBlock(ctx, "test", arr, 123); err != nil {
t.Fatal(err)
}
}
func Test_Delete(t *testing.T) {
db := GetDB()
ctx := context.Background()
var arr []*unRelationTb.MsgInfoModel
for i := 0; i < 123; i++ {
arr = append(arr, &unRelationTb.MsgInfoModel{
DelList: []string{"uid_1", "uid_2"},
})
}
if err := db.BatchInsertBlock(ctx, "test", arr, 210); err != nil {
t.Fatal(err)
}
}
func Test_Delete1(t *testing.T) {
config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
config.Config.Mongo.DBTimeout = 60
config.Config.Mongo.DBDatabase = "openIM"
config.Config.Mongo.DBSource = "admin"
config.Config.Mongo.DBUserName = "root"
config.Config.Mongo.DBPassword = "openIM123"
config.Config.Mongo.DBMaxPoolSize = 100
config.Config.Mongo.DBRetainChatRecords = 3650
config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
mongo, err := unrelation.NewMongo()
if err != nil {
panic(err)
}
err = mongo.GetDatabase().Client().Ping(context.Background(), nil)
if err != nil {
panic(err) panic(err)
} }
sort.Ints(arr) c := mongo.GetClient().Database("openIM").Collection("msg")
for i, v := range arr { var o unRelationTb.MsgDocModel
fmt.Println(i, v, v == i+1)
if v != i+1 { err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o)
panic(fmt.Sprintf("expected %d, got %d", i+1, v)) if err != nil {
} panic(err)
}
for i, model := range o.Msg {
fmt.Println(i, model == nil)
} }
} }

View File

@ -2,10 +2,10 @@ package unrelation
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"go.mongodb.org/mongo-driver/mongo"
"strconv" "strconv"
"strings" "strings"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
) )
const ( const (
@ -16,22 +16,60 @@ const (
) )
type MsgDocModel struct { type MsgDocModel struct {
DocID string `bson:"doc_id"` DocID string `bson:"doc_id"`
Msg []MsgInfoModel `bson:"msgs"` Msg []*MsgInfoModel `bson:"msgs"`
}
type RevokeModel struct {
UserID string `bson:"user_id"`
Nickname string `bson:"nickname"`
Time int64 `bson:"time"`
}
type OfflinePushModel struct {
Title string `bson:"title"`
Desc string `bson:"desc"`
Ex string `bson:"ex"`
IOSPushSound string `bson:"ios_push_sound"`
IOSBadgeCount bool `bson:"ios_badge_count"`
}
type MsgDataModel struct {
SendID string `bson:"send_id"`
RecvID string `bson:"recv_id"`
GroupID string `bson:"group_id"`
ClientMsgID string `bson:"client_msg_id"`
ServerMsgID string `bson:"server_msg_id"`
SenderPlatformID int32 `bson:"sender_platform_id"`
SenderNickname string `bson:"sender_nickname"`
SenderFaceURL string `bson:"sender_face_url"`
SessionType int32 `bson:"session_type"`
MsgFrom int32 `bson:"msg_from"`
ContentType int32 `bson:"content_type"`
Content string `bson:"content"`
Seq int64 `bson:"seq"`
SendTime int64 `bson:"send_time"`
CreateTime int64 `bson:"create_time"`
Status int32 `bson:"status"`
Options map[string]bool `bson:"options"`
OfflinePush *OfflinePushModel `bson:"offline_push"`
AtUserIDList []string `bson:"at_user_id_list"`
AttachedInfo string `bson:"attached_info"`
Ex string `bson:"ex"`
} }
type MsgInfoModel struct { type MsgInfoModel struct {
SendTime int64 `bson:"sendtime"` Msg *MsgDataModel `bson:"msg"`
Msg []byte `bson:"msg"` Revoke *RevokeModel `bson:"revoke"`
Revoke bool `bson:"revoke"` DelList []string `bson:"del_list"`
ReadList []string `bson:"read_list"` ReadList []string `bson:"read_list"`
DelList []string `bson:"del_list"`
} }
type MsgDocModelInterface interface { type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error Create(ctx context.Context, model *MsgDocModel) error
UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error) IsExistDocID(ctx context.Context, docID string) (bool, error)
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
@ -54,7 +92,8 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 {
} }
func (m *MsgDocModel) IsFull() bool { func (m *MsgDocModel) IsFull() bool {
return m.Msg[len(m.Msg)-1].SendTime != 0 //return m.Msg[len(m.Msg)-1].SendTime != 0
return false
} }
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -26,7 +25,16 @@ type MsgMongoDriver struct {
} }
func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface { func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface {
return &MsgMongoDriver{MsgCollection: database.Collection(table.MsgDocModel{}.TableName())} collection := database.Collection(table.MsgDocModel{}.TableName())
indexModel := mongo.IndexModel{
Keys: bson.M{"doc_id": 1},
Options: options.Index().SetUnique(true),
}
_, err := collection.Indexes().CreateOne(context.Background(), indexModel)
if err != nil {
panic(err)
}
return &MsgMongoDriver{MsgCollection: collection}
} }
func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.MsgInfoModel) error { func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.MsgInfoModel) error {
@ -38,12 +46,41 @@ func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) e
return err return err
} }
func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, info *table.MsgInfoModel) error { func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d", index): info}}) var field string
if err != nil { if key == "" {
return utils.Wrap(err, "") field = fmt.Sprintf("msgs.%d", index)
} else {
field = fmt.Sprintf("msgs.%d.%s", index, key)
} }
return nil filter := bson.M{"doc_id": docID}
update := bson.M{"$set": bson.M{field: value}}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil {
return nil, utils.Wrap(err, "")
}
return res, nil
}
// PushUnique value must slice
func (m *MsgMongoDriver) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
var field string
if key == "" {
field = fmt.Sprintf("msgs.%d", index)
} else {
field = fmt.Sprintf("msgs.%d.%s", index, key)
}
filter := bson.M{"doc_id": docID}
update := bson.M{
"$addToSet": bson.M{
field: bson.M{"$each": value},
},
}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil {
return nil, utils.Wrap(err, "")
}
return res, nil
} }
func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error { func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {
@ -74,33 +111,33 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab
} }
func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := m.FindOneByDocID(ctx, docID) //doc, err := m.FindOneByDocID(ctx, docID)
if err != nil { //if err != nil {
return nil, nil, nil, err // return nil, nil, nil, err
} //}
singleCount := 0 //singleCount := 0
var hasSeqList []int64 //var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ { //for i := 0; i < len(doc.Msg); i++ {
var msg sdkws.MsgData // var msg sdkws.MsgData
if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil { // if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil {
return nil, nil, nil, err // return nil, nil, nil, err
} // }
if utils.Contain(msg.Seq, seqs...) { // if utils.Contain(msg.Seq, seqs...) {
indexes = append(indexes, i) // indexes = append(indexes, i)
seqMsgs = append(seqMsgs, &msg) // seqMsgs = append(seqMsgs, &msg)
hasSeqList = append(hasSeqList, msg.Seq) // hasSeqList = append(hasSeqList, msg.Seq)
singleCount++ // singleCount++
if singleCount == len(seqs) { // if singleCount == len(seqs) {
break // break
} // }
} // }
} //}
for _, i := range seqs { //for _, i := range seqs {
if utils.Contain(i, hasSeqList...) { // if utils.Contain(i, hasSeqList...) {
continue // continue
} // }
unExistSeqs = append(unExistSeqs, i) // unExistSeqs = append(unExistSeqs, i)
} //}
return seqMsgs, indexes, unExistSeqs, nil return seqMsgs, indexes, unExistSeqs, nil
} }
@ -122,49 +159,49 @@ func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID stri
} }
func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) {
var msgDocs []table.MsgDocModel //var msgDocs []table.MsgDocModel
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": -1})) //cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": -1}))
if err != nil { //if err != nil {
return nil, utils.Wrap(err, "") // return nil, utils.Wrap(err, "")
} //}
err = cursor.All(ctx, &msgDocs) //err = cursor.All(ctx, &msgDocs)
if err != nil { //if err != nil {
return nil, utils.Wrap(err, "") // return nil, utils.Wrap(err, "")
} //}
if len(msgDocs) > 0 { //if len(msgDocs) > 0 {
if len(msgDocs[0].Msg) > 0 { // if len(msgDocs[0].Msg) > 0 {
return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil // return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil
} // }
return nil, errs.ErrRecordNotFound.Wrap("len(msgDocs[0].Msgs) < 0") // return nil, errs.ErrRecordNotFound.Wrap("len(msgDocs[0].Msgs) < 0")
} //}
return nil, ErrMsgNotFound return nil, ErrMsgNotFound
} }
func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) {
var msgDocs []table.MsgDocModel //var msgDocs []table.MsgDocModel
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1})) //cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1}))
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
err = cursor.All(ctx, &msgDocs) //err = cursor.All(ctx, &msgDocs)
if err != nil { //if err != nil {
return nil, utils.Wrap(err, "") // return nil, utils.Wrap(err, "")
} //}
var oldestMsg table.MsgInfoModel //var oldestMsg table.MsgInfoModel
if len(msgDocs) > 0 { //if len(msgDocs) > 0 {
for _, v := range msgDocs[0].Msg { // for _, v := range msgDocs[0].Msg {
if v.SendTime != 0 { // if v.SendTime != 0 {
oldestMsg = v // oldestMsg = v
break // break
} // }
} // }
if len(oldestMsg.Msg) == 0 { // if len(oldestMsg.Msg) == 0 {
if len(msgDocs[0].Msg) > 0 { // if len(msgDocs[0].Msg) > 0 {
oldestMsg = msgDocs[0].Msg[0] // oldestMsg = msgDocs[0].Msg[0]
} // }
} // }
return &oldestMsg, nil // return &oldestMsg, nil
} //}
return nil, ErrMsgNotFound return nil, ErrMsgNotFound
} }
@ -182,50 +219,50 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode
} }
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) { func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) {
beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) //beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
beginIndex := m.msg.GetMsgIndex(beginSeq) //beginIndex := m.msg.GetMsgIndex(beginSeq)
num := endSeq - beginSeq + 1 //num := endSeq - beginSeq + 1
pipeline := bson.A{ //pipeline := bson.A{
bson.M{ // bson.M{
"$match": bson.M{"doc_id": docID}, // "$match": bson.M{"doc_id": docID},
}, // },
bson.M{ // bson.M{
"$project": bson.M{ // "$project": bson.M{
"msgs": bson.M{ // "msgs": bson.M{
"$slice": bson.A{"$msgs", beginIndex, num}, // "$slice": bson.A{"$msgs", beginIndex, num},
}, // },
}, // },
}, // },
} //}
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) //cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
if err != nil { //if err != nil {
return nil, errs.Wrap(err) // return nil, errs.Wrap(err)
} //}
defer cursor.Close(ctx) //defer cursor.Close(ctx)
var doc table.MsgDocModel //var doc table.MsgDocModel
i := 0 //i := 0
for cursor.Next(ctx) { //for cursor.Next(ctx) {
err := cursor.Decode(&doc) // err := cursor.Decode(&doc)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
if i == 0 { // if i == 0 {
break // break
} // }
} //}
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) //log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
for _, v := range doc.Msg { //for _, v := range doc.Msg {
var msg sdkws.MsgData // var msg sdkws.MsgData
if err := proto.Unmarshal(v.Msg, &msg); err != nil { // if err := proto.Unmarshal(v.Msg, &msg); err != nil {
return nil, err // return nil, err
} // }
if msg.Seq >= beginSeq && msg.Seq <= endSeq { // if msg.Seq >= beginSeq && msg.Seq <= endSeq {
log.ZDebug(ctx, "find msg", "msg", &msg) // log.ZDebug(ctx, "find msg", "msg", &msg)
msgs = append(msgs, &msg) // msgs = append(msgs, &msg)
} else { // } else {
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg) // log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg)
} // }
} //}
return msgs, nil return msgs, nil
} }

File diff suppressed because it is too large Load Diff

View File

@ -44,21 +44,6 @@ message SendMsgResp {
} }
message ClearMsgReq{
string userID = 1;
}
message ClearMsgResp{
}
message SetMsgMinSeqReq{
string userID = 1;
string groupID = 2;
uint32 minSeq = 3;
}
message SetMsgMinSeqResp{
}
message SetSendMsgStatusReq{ message SetSendMsgStatusReq{
int32 status = 1; int32 status = 1;
@ -74,28 +59,6 @@ message GetSendMsgStatusResp{
int32 status = 1; int32 status = 1;
} }
message DelSuperGroupMsgReq{
string userID = 1;
string groupID = 2;
}
message DelSuperGroupMsgResp{
}
message GetSuperGroupMsgReq{
int64 Seq = 1;
string groupID = 2;
}
message GetSuperGroupMsgResp{
sdkws.MsgData msgData = 1;
}
message GetWriteDiffMsgReq{
int64 Seq = 1;
}
message GetWriteDiffMsgResp{
sdkws.MsgData msgData = 2;
}
message ModifyMessageReactionExtensionsReq { message ModifyMessageReactionExtensionsReq {
string conversationID = 1; string conversationID = 1;
@ -205,6 +168,46 @@ message RevokeMsgReq {
message RevokeMsgResp { message RevokeMsgResp {
} }
message ClearConversationsMsgReq {
repeated string conversationIDs = 1;
string userID = 2;
}
message ClearConversationsMsgResp {
}
message UserClearAllMsgReq {
string userID = 1;
}
message UserClearAllMsgResp {
}
message DeleteMsgsReq {
string conversationID = 1;
repeated int64 seqs = 2;
string userID = 3;
}
message DeleteMsgsResp {
}
message DeleteMsgPhysicalReq {
repeated string conversationIDs = 1;
int64 remainTime = 2;
}
message DeleteMsgPhysicalResp {
}
message DeleteMsgPhysicalBySeqReq {
string conversationID = 1;
repeated int64 seqs = 2;
}
message DeleteMsgPhysicalBySeqResp {
}
service msg { service msg {
//seq //seq
rpc GetMaxSeq(sdkws.GetMaxSeqReq) returns(sdkws.GetMaxSeqResp); rpc GetMaxSeq(sdkws.GetMaxSeqReq) returns(sdkws.GetMaxSeqResp);
@ -212,12 +215,18 @@ service msg {
rpc PullMessageBySeqs(sdkws.PullMessageBySeqsReq) returns(sdkws.PullMessageBySeqsResp); rpc PullMessageBySeqs(sdkws.PullMessageBySeqsReq) returns(sdkws.PullMessageBySeqsResp);
// //
rpc SendMsg(SendMsgReq) returns(SendMsgResp); rpc SendMsg(SendMsgReq) returns(SendMsgResp);
//
rpc DelMsgs(DelMsgsReq) returns(DelMsgsResp); // min seq seq大1
// rpc ClearConversationsMsg(ClearConversationsMsgReq) returns(ClearConversationsMsgResp);
rpc DelSuperGroupMsg(DelSuperGroupMsgReq) returns(DelSuperGroupMsgResp); // min seq seq大1
// rpc UserClearAllMsg(UserClearAllMsgReq) returns(UserClearAllMsgResp);
rpc ClearMsg(ClearMsgReq) returns(ClearMsgResp); // by Seq
rpc DeleteMsgs(DeleteMsgsReq) returns(DeleteMsgsResp);
// seq物理删除消息
rpc DeleteMsgPhysicalBySeq(DeleteMsgPhysicalBySeqReq) returns(DeleteMsgPhysicalBySeqResp);
// by
rpc DeleteMsgPhysical(DeleteMsgPhysicalReq) returns(DeleteMsgPhysicalResp);
//-api发送的消息 //-api发送的消息
rpc SetSendMsgStatus(SetSendMsgStatusReq) returns(SetSendMsgStatusResp); rpc SetSendMsgStatus(SetSendMsgStatusReq) returns(SetSendMsgStatusResp);
// //

View File

@ -80,6 +80,15 @@ func DistinctAnyGetComparable[E any, K comparable](es []E, fn func(e E) K) []K {
// Distinct 去重 // Distinct 去重
func Distinct[T comparable](ts []T) []T { func Distinct[T comparable](ts []T) []T {
if len(ts) < 2 {
return ts
} else if len(ts) == 2 {
if ts[0] == ts[1] {
return ts[:1]
} else {
return ts
}
}
return DistinctAny(ts, func(t T) T { return DistinctAny(ts, func(t T) T {
return t return t
}) })