mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-03 14:31:43 +08:00
Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode
This commit is contained in:
commit
501e03e57e
@ -11,14 +11,13 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||||
s.LongConnServer.SetMessageHandler(rpcclient.NewMsgClient(client))
|
s.LongConnServer.SetDiscoveryRegistry(client)
|
||||||
msggateway.RegisterMsgGatewayServer(server, s)
|
msggateway.RegisterMsgGatewayServer(server, s)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,8 @@ package msggateway
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
@ -50,11 +52,13 @@ var _ MessageHandler = (*GrpcHandler)(nil)
|
|||||||
|
|
||||||
type GrpcHandler struct {
|
type GrpcHandler struct {
|
||||||
msgRpcClient *rpcclient.MsgClient
|
msgRpcClient *rpcclient.MsgClient
|
||||||
|
pushClient *rpcclient.PushClient
|
||||||
validate *validator.Validate
|
validate *validator.Validate
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcHandler(validate *validator.Validate, msgRpcClient *rpcclient.MsgClient) *GrpcHandler {
|
func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler {
|
||||||
return &GrpcHandler{msgRpcClient: msgRpcClient, validate: validate}
|
return &GrpcHandler{msgRpcClient: rpcclient.NewMsgClient(client),
|
||||||
|
pushClient: rpcclient.NewPushClient(client), validate: validate}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) {
|
func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) {
|
||||||
@ -137,8 +141,11 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data Req) ([]
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, error) {
|
func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, error) {
|
||||||
//todo
|
req := push.DelUserPushTokenReq{}
|
||||||
resp, err := g.msgRpcClient.PullMessageBySeqList(context, nil)
|
if err := proto.Unmarshal(data.Data, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := g.pushClient.DelUserPushToken(context, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package msggateway
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -10,7 +11,6 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
"github.com/go-playground/validator/v10"
|
"github.com/go-playground/validator/v10"
|
||||||
)
|
)
|
||||||
@ -21,7 +21,8 @@ type LongConnServer interface {
|
|||||||
GetUserAllCons(userID string) ([]*Client, bool)
|
GetUserAllCons(userID string) ([]*Client, bool)
|
||||||
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
|
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
|
||||||
Validate(s interface{}) error
|
Validate(s interface{}) error
|
||||||
SetMessageHandler(msgRpcClient *rpcclient.MsgClient)
|
//SetMessageHandler(msgRpcClient *rpcclient.MsgClient)
|
||||||
|
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
|
||||||
UnRegister(c *Client)
|
UnRegister(c *Client)
|
||||||
Compressor
|
Compressor
|
||||||
Encoder
|
Encoder
|
||||||
@ -51,8 +52,8 @@ type WsServer struct {
|
|||||||
MessageHandler
|
MessageHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WsServer) SetMessageHandler(rpcClient *rpcclient.MsgClient) {
|
func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) {
|
||||||
ws.MessageHandler = NewGrpcHandler(ws.validate, rpcClient)
|
ws.MessageHandler = NewGrpcHandler(ws.validate, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WsServer) UnRegister(c *Client) {
|
func (ws *WsServer) UnRegister(c *Client) {
|
||||||
|
@ -2,8 +2,6 @@ package msgtransfer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
|
|
||||||
@ -55,36 +53,47 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
|
|||||||
}
|
}
|
||||||
for _, v := range msgFromMQ.MsgData {
|
for _, v := range msgFromMQ.MsgData {
|
||||||
switch v.ContentType {
|
switch v.ContentType {
|
||||||
case constant.MsgRevokeNotification:
|
case constant.DeleteMessageNotification:
|
||||||
var elem sdkws.NotificationElem
|
deleteMessageTips := sdkws.DeleteMessageTips{}
|
||||||
if err := json.Unmarshal(v.Content, &elem); err != nil {
|
err := proto.Unmarshal(v.Content, &deleteMessageTips)
|
||||||
log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var tips sdkws.RevokeMsgTips
|
|
||||||
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
|
||||||
log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
|
log.ZError(ctx, "tips unmarshal err:", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(msgs) == 0 {
|
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
|
||||||
log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq)
|
log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs)
|
||||||
continue
|
|
||||||
}
|
|
||||||
msgs[0].Content = []byte(elem.Detail)
|
|
||||||
data, err := proto.Marshal(msgs[0])
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "proto.Marshal MsgData", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil {
|
|
||||||
log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
//case constant.MsgRevokeNotification:
|
||||||
|
// var elem sdkws.NotificationElem
|
||||||
|
// if err := json.Unmarshal(v.Content, &elem); err != nil {
|
||||||
|
// log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content))
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// var tips sdkws.RevokeMsgTips
|
||||||
|
// if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
||||||
|
// log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content))
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq})
|
||||||
|
// if err != nil {
|
||||||
|
// log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// if len(msgs) == 0 {
|
||||||
|
// log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// msgs[0].Content = []byte(elem.Detail)
|
||||||
|
// data, err := proto.Marshal(msgs[0])
|
||||||
|
// if err != nil {
|
||||||
|
// log.ZError(ctx, "proto.Marshal MsgData", err)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil {
|
||||||
|
// log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,11 +5,13 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
|
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) {
|
func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) {
|
||||||
@ -22,9 +24,16 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
|
|||||||
if req.RecvID != "" && req.GroupID != "" {
|
if req.RecvID != "" && req.GroupID != "" {
|
||||||
return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time")
|
return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time")
|
||||||
}
|
}
|
||||||
|
if req.Seq < 0 {
|
||||||
|
return nil, errs.ErrArgs.Wrap("seq is invalid")
|
||||||
|
}
|
||||||
if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil {
|
if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
user, err := m.User.GetUserInfo(ctx, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
var sessionType int32
|
var sessionType int32
|
||||||
var conversationID string
|
var conversationID string
|
||||||
if req.GroupID == "" {
|
if req.GroupID == "" {
|
||||||
@ -34,6 +43,46 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
|
|||||||
sessionType = constant.SuperGroupChatType
|
sessionType = constant.SuperGroupChatType
|
||||||
conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID)
|
conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID)
|
||||||
}
|
}
|
||||||
|
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, conversationID, []int64{req.Seq})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(msgs) == 0 {
|
||||||
|
return nil, errs.ErrRecordNotFound.Wrap("msg not found")
|
||||||
|
}
|
||||||
|
sendID := msgs[0].SendID
|
||||||
|
if !tokenverify.IsAppManagerUid(ctx) {
|
||||||
|
if req.GroupID == "" {
|
||||||
|
if req.UserID != sendID {
|
||||||
|
return nil, errs.ErrNoPermission.Wrap("no permission")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
members, err := m.Group.GetGroupMemberInfoMap(ctx, req.GroupID, utils.Distinct([]string{req.UserID, sendID}), true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if req.UserID != sendID {
|
||||||
|
roleLevel := members[req.UserID].RoleLevel
|
||||||
|
switch members[req.UserID].RoleLevel {
|
||||||
|
case constant.GroupOwner:
|
||||||
|
case constant.GroupAdmin:
|
||||||
|
if roleLevel != constant.GroupOrdinaryUsers {
|
||||||
|
return nil, errs.ErrNoPermission.Wrap("no permission")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, errs.ErrNoPermission.Wrap("no permission")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = m.MsgDatabase.RevokeMsg(ctx, conversationID, req.Seq, &unRelationTb.RevokeModel{
|
||||||
|
UserID: req.UserID,
|
||||||
|
Nickname: user.Nickname,
|
||||||
|
Time: time.Now().UnixMilli(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
tips := sdkws.RevokeMsgTips{
|
tips := sdkws.RevokeMsgTips{
|
||||||
RevokerUserID: req.UserID,
|
RevokerUserID: req.UserID,
|
||||||
ClientMsgID: "",
|
ClientMsgID: "",
|
||||||
|
@ -27,11 +27,18 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
updateKeyMsg = iota
|
||||||
|
updateKeyRevoke
|
||||||
|
updateKeyDel
|
||||||
|
updateKeyRead
|
||||||
|
)
|
||||||
|
|
||||||
type CommonMsgDatabase interface {
|
type CommonMsgDatabase interface {
|
||||||
// 批量插入消息
|
// 批量插入消息
|
||||||
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
||||||
// 撤回消息
|
// 撤回消息
|
||||||
RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error
|
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error
|
||||||
// 刪除redis中消息缓存
|
// 刪除redis中消息缓存
|
||||||
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
|
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
|
||||||
// incrSeq然后批量插入缓存
|
// incrSeq然后批量插入缓存
|
||||||
@ -140,14 +147,33 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, msgList []*unRelationTb.MsgInfoModel, firstSeq int64) error {
|
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
|
||||||
if len(msgList) == 0 {
|
if len(fields) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
num := db.msg.GetSingleGocMsgNum()
|
num := db.msg.GetSingleGocMsgNum()
|
||||||
num = 100
|
//num = 100
|
||||||
if msgList[0].Msg != nil {
|
for i, field := range fields { // 检查类型
|
||||||
firstSeq = msgList[0].Msg.Seq
|
var ok bool
|
||||||
|
switch key {
|
||||||
|
case updateKeyMsg:
|
||||||
|
var msg *unRelationTb.MsgDataModel
|
||||||
|
msg, ok = field.(*unRelationTb.MsgDataModel)
|
||||||
|
if msg != nil && msg.Seq != firstSeq+int64(i) {
|
||||||
|
return errs.ErrInternalServer.Wrap("seq is invalid")
|
||||||
|
}
|
||||||
|
case updateKeyRevoke:
|
||||||
|
_, ok = field.(*unRelationTb.RevokeModel)
|
||||||
|
case updateKeyDel:
|
||||||
|
_, ok = field.([]string)
|
||||||
|
case updateKeyRead:
|
||||||
|
_, ok = field.([]string)
|
||||||
|
default:
|
||||||
|
return errs.ErrInternalServer.Wrap("key is invalid")
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return errs.ErrInternalServer.Wrap("field type is invalid")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
getDocID := func(seq int64) string {
|
getDocID := func(seq int64) string {
|
||||||
return conversationID + ":" + strconv.FormatInt(seq/num, 10)
|
return conversationID + ":" + strconv.FormatInt(seq/num, 10)
|
||||||
@ -156,21 +182,23 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
|||||||
return seq % num
|
return seq % num
|
||||||
}
|
}
|
||||||
// 返回值为true表示数据库存在该文档,false表示数据库不存在该文档
|
// 返回值为true表示数据库存在该文档,false表示数据库不存在该文档
|
||||||
updateMsgModel := func(docID string, index int64, msg *unRelationTb.MsgInfoModel) (bool, error) {
|
updateMsgModel := func(seq int64, i int) (bool, error) {
|
||||||
var (
|
var (
|
||||||
res *mongo.UpdateResult
|
res *mongo.UpdateResult
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
if msg.Msg != nil {
|
docID := getDocID(seq)
|
||||||
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msg.Msg)
|
index := getIndex(seq)
|
||||||
} else if msg.Revoke != nil {
|
field := fields[i]
|
||||||
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke)
|
switch key {
|
||||||
} else if msg.DelList != nil {
|
case updateKeyMsg:
|
||||||
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList)
|
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
|
||||||
// } else if msg.ReadList != nil {
|
case updateKeyRevoke:
|
||||||
// res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList)
|
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
|
||||||
} else {
|
case updateKeyDel:
|
||||||
return false, errs.ErrArgs.Wrap("msg all field is nil")
|
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", field)
|
||||||
|
case updateKeyRead:
|
||||||
|
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", field)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@ -178,33 +206,52 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
|||||||
return res.MatchedCount > 0, nil
|
return res.MatchedCount > 0, nil
|
||||||
}
|
}
|
||||||
tryUpdate := true
|
tryUpdate := true
|
||||||
for i := 0; i < len(msgList); i++ {
|
for i := 0; i < len(fields); i++ {
|
||||||
msg := msgList[i]
|
seq := firstSeq + int64(i) // 当前seq
|
||||||
seq := firstSeq + int64(i)
|
|
||||||
docID := getDocID(seq)
|
|
||||||
if tryUpdate {
|
if tryUpdate {
|
||||||
matched, err := updateMsgModel(docID, getIndex(seq), msg)
|
matched, err := updateMsgModel(seq, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if matched {
|
if matched {
|
||||||
continue
|
continue // 匹配到了,继续下一个(不一定修改)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
doc := unRelationTb.MsgDocModel{
|
doc := unRelationTb.MsgDocModel{
|
||||||
DocID: docID,
|
DocID: getDocID(seq),
|
||||||
Msg: make([]*unRelationTb.MsgInfoModel, num),
|
Msg: make([]*unRelationTb.MsgInfoModel, num),
|
||||||
}
|
}
|
||||||
var insert int
|
var insert int // 插入的数量
|
||||||
for j := i; j < len(msgList); j++ {
|
for j := i; j < len(fields); j++ {
|
||||||
seq = firstSeq + int64(j)
|
seq = firstSeq + int64(j)
|
||||||
if getDocID(seq) != docID {
|
if getDocID(seq) != doc.DocID {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
insert++
|
insert++
|
||||||
doc.Msg[getIndex(seq)] = msgList[j]
|
switch key {
|
||||||
|
case updateKeyMsg:
|
||||||
|
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
|
||||||
|
Msg: fields[j].(*unRelationTb.MsgDataModel),
|
||||||
|
}
|
||||||
|
case updateKeyRevoke:
|
||||||
|
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
|
||||||
|
Revoke: fields[j].(*unRelationTb.RevokeModel),
|
||||||
|
}
|
||||||
|
case updateKeyDel:
|
||||||
|
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
|
||||||
|
DelList: fields[j].([]string),
|
||||||
|
}
|
||||||
|
case updateKeyRead:
|
||||||
|
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
|
||||||
|
ReadList: fields[j].([]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for i, model := range doc.Msg {
|
for i, model := range doc.Msg {
|
||||||
|
if model == nil {
|
||||||
|
model = &unRelationTb.MsgInfoModel{}
|
||||||
|
doc.Msg[i] = model
|
||||||
|
}
|
||||||
if model.DelList == nil {
|
if model.DelList == nil {
|
||||||
doc.Msg[i].DelList = []string{}
|
doc.Msg[i].DelList = []string{}
|
||||||
}
|
}
|
||||||
@ -214,111 +261,63 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
|||||||
}
|
}
|
||||||
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
||||||
if mongo.IsDuplicateKeyError(err) {
|
if mongo.IsDuplicateKeyError(err) {
|
||||||
i--
|
i-- // 存在并发,重试当前数据
|
||||||
tryUpdate = true
|
tryUpdate = true // 以修改模式
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tryUpdate = false
|
tryUpdate = false // 当前以插入成功,下一块优先插入模式
|
||||||
i += insert - 1
|
i += insert - 1 // 跳过已插入的数据
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||||
//num := db.msg.GetSingleGocMsgNum()
|
msgs := make([]any, len(msgList))
|
||||||
//currentIndex := currentMaxSeq / num
|
for i, msg := range msgList {
|
||||||
//var blockMsgs []*[]*sdkws.MsgData
|
if msg == nil {
|
||||||
//for i, data := range msgList {
|
continue
|
||||||
// data.Seq = currentMaxSeq + int64(i+1)
|
}
|
||||||
// index := data.Seq/num - currentIndex
|
var offlinePushModel *unRelationTb.OfflinePushModel
|
||||||
// if i == 0 && index == 1 {
|
if msg.OfflinePushInfo != nil {
|
||||||
// index--
|
offlinePushModel = &unRelationTb.OfflinePushModel{
|
||||||
// currentIndex++
|
Title: msg.OfflinePushInfo.Title,
|
||||||
// }
|
Desc: msg.OfflinePushInfo.Desc,
|
||||||
// var block *[]*sdkws.MsgData
|
Ex: msg.OfflinePushInfo.Ex,
|
||||||
// if len(blockMsgs) == int(index) {
|
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
|
||||||
// var size int64
|
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
|
||||||
// if i == 0 {
|
}
|
||||||
// size = num - data.Seq%num
|
}
|
||||||
// } else {
|
msgs[i] = &unRelationTb.MsgDataModel{
|
||||||
// temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num
|
SendID: msg.SendID,
|
||||||
// if temp >= num {
|
RecvID: msg.RecvID,
|
||||||
// size = num
|
GroupID: msg.GroupID,
|
||||||
// } else {
|
ClientMsgID: msg.ClientMsgID,
|
||||||
// size = temp % num
|
ServerMsgID: msg.ServerMsgID,
|
||||||
// }
|
SenderPlatformID: msg.SenderPlatformID,
|
||||||
// }
|
SenderNickname: msg.SenderNickname,
|
||||||
// temp := make([]*sdkws.MsgData, 0, size)
|
SenderFaceURL: msg.SenderFaceURL,
|
||||||
// block = &temp
|
SessionType: msg.SessionType,
|
||||||
// blockMsgs = append(blockMsgs, block)
|
MsgFrom: msg.MsgFrom,
|
||||||
// } else {
|
ContentType: msg.ContentType,
|
||||||
// block = blockMsgs[index]
|
Content: string(msg.Content),
|
||||||
// }
|
Seq: msg.Seq,
|
||||||
// *block = append(*block, msgList[i])
|
SendTime: msg.SendTime,
|
||||||
//}
|
CreateTime: msg.CreateTime,
|
||||||
//create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0)
|
Status: msg.Status,
|
||||||
//if !create {
|
Options: msg.Options,
|
||||||
// exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex))
|
OfflinePush: offlinePushModel,
|
||||||
// if err != nil {
|
AtUserIDList: msg.AtUserIDList,
|
||||||
// return err
|
AttachedInfo: msg.AttachedInfo,
|
||||||
// }
|
Ex: msg.Ex,
|
||||||
// create = !exist
|
}
|
||||||
//}
|
}
|
||||||
//for i, msgs := range blockMsgs {
|
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, currentMaxSeq-int64(len(msgList)))
|
||||||
// docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
|
|
||||||
// if create || i != 0 { // 插入
|
|
||||||
// doc := unRelationTb.MsgDocModel{
|
|
||||||
// DocID: docID,
|
|
||||||
// Msg: make([]unRelationTb.MsgInfoModel, num),
|
|
||||||
// }
|
|
||||||
// for i := 0; i < len(doc.Msg); i++ {
|
|
||||||
// doc.Msg[i].ReadList = []string{}
|
|
||||||
// doc.Msg[i].DelList = []string{}
|
|
||||||
// }
|
|
||||||
// for _, msg := range *msgs {
|
|
||||||
// data, err := proto.Marshal(msg)
|
|
||||||
// if err != nil {
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
|
|
||||||
// SendTime: msg.SendTime,
|
|
||||||
// Msg: data,
|
|
||||||
// ReadList: []string{},
|
|
||||||
// DelList: []string{},
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
|
||||||
// prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
||||||
// return utils.Wrap(err, "")
|
|
||||||
// }
|
|
||||||
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
||||||
// } else { // 修改
|
|
||||||
// for _, msg := range *msgs {
|
|
||||||
// data, err := proto.Marshal(msg)
|
|
||||||
// if err != nil {
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// info := unRelationTb.MsgInfoModel{
|
|
||||||
// SendTime: msg.SendTime,
|
|
||||||
// Msg: data,
|
|
||||||
// }
|
|
||||||
// if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
|
|
||||||
// prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error {
|
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error {
|
||||||
index := seq / db.msg.GetSingleGocMsgNum()
|
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
|
||||||
docID := db.msg.IndexDocID(conversationID, index)
|
|
||||||
return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
|
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
|
||||||
|
@ -197,3 +197,38 @@ func Test_Delete(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_Delete1(t *testing.T) {
|
||||||
|
config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
|
||||||
|
config.Config.Mongo.DBTimeout = 60
|
||||||
|
config.Config.Mongo.DBDatabase = "openIM"
|
||||||
|
config.Config.Mongo.DBSource = "admin"
|
||||||
|
config.Config.Mongo.DBUserName = "root"
|
||||||
|
config.Config.Mongo.DBPassword = "openIM123"
|
||||||
|
config.Config.Mongo.DBMaxPoolSize = 100
|
||||||
|
config.Config.Mongo.DBRetainChatRecords = 3650
|
||||||
|
config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
|
||||||
|
|
||||||
|
mongo, err := unrelation.NewMongo()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
err = mongo.GetDatabase().Client().Ping(context.Background(), nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := mongo.GetClient().Database("openIM").Collection("msg")
|
||||||
|
|
||||||
|
var o unRelationTb.MsgDocModel
|
||||||
|
|
||||||
|
err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, model := range o.Msg {
|
||||||
|
fmt.Println(i, model == nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
33
pkg/rpcclient/push.go
Normal file
33
pkg/rpcclient/push.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package rpcclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PushClient struct {
|
||||||
|
MetaClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPushClient(client discoveryregistry.SvcDiscoveryRegistry) *PushClient {
|
||||||
|
return &PushClient{
|
||||||
|
MetaClient: MetaClient{
|
||||||
|
client: client,
|
||||||
|
rpcRegisterName: config.Config.RpcRegisterName.OpenImPushName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PushClient) DelUserPushToken(ctx context.Context, req push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) {
|
||||||
|
cc, err := p.getConn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := push.NewPushMsgServiceClient(cc).DelUserPushToken(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
@ -80,6 +80,15 @@ func DistinctAnyGetComparable[E any, K comparable](es []E, fn func(e E) K) []K {
|
|||||||
|
|
||||||
// Distinct 去重
|
// Distinct 去重
|
||||||
func Distinct[T comparable](ts []T) []T {
|
func Distinct[T comparable](ts []T) []T {
|
||||||
|
if len(ts) < 2 {
|
||||||
|
return ts
|
||||||
|
} else if len(ts) == 2 {
|
||||||
|
if ts[0] == ts[1] {
|
||||||
|
return ts[:1]
|
||||||
|
} else {
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
}
|
||||||
return DistinctAny(ts, func(t T) T {
|
return DistinctAny(ts, func(t T) T {
|
||||||
return t
|
return t
|
||||||
})
|
})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user