diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 6f0660f36..e4124a5af 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -11,14 +11,13 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway" - "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "google.golang.org/grpc" ) func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - s.LongConnServer.SetMessageHandler(rpcclient.NewMsgClient(client)) + s.LongConnServer.SetDiscoveryRegistry(client) msggateway.RegisterMsgGatewayServer(server, s) return nil } diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 6f630e5c9..6eb936ab0 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -2,6 +2,8 @@ package msggateway import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" @@ -50,11 +52,13 @@ var _ MessageHandler = (*GrpcHandler)(nil) type GrpcHandler struct { msgRpcClient *rpcclient.MsgClient + pushClient *rpcclient.PushClient validate *validator.Validate } -func NewGrpcHandler(validate *validator.Validate, msgRpcClient *rpcclient.MsgClient) *GrpcHandler { - return &GrpcHandler{msgRpcClient: msgRpcClient, validate: validate} +func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler { + return &GrpcHandler{msgRpcClient: rpcclient.NewMsgClient(client), + pushClient: rpcclient.NewPushClient(client), validate: validate} } func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) { @@ -137,8 +141,11 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data Req) ([] } func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, error) { - //todo - resp, err := g.msgRpcClient.PullMessageBySeqList(context, nil) + req := push.DelUserPushTokenReq{} + if err := proto.Unmarshal(data.Data, &req); err != nil { + return nil, err + } + resp, err := g.pushClient.DelUserPushToken(context, req) if err != nil { return nil, err } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index a4999bdb6..f11891d1e 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -2,6 +2,7 @@ package msggateway import ( "errors" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "net/http" "sync" "sync/atomic" @@ -10,7 +11,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/go-playground/validator/v10" ) @@ -21,7 +21,8 @@ type LongConnServer interface { GetUserAllCons(userID string) ([]*Client, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s interface{}) error - SetMessageHandler(msgRpcClient *rpcclient.MsgClient) + //SetMessageHandler(msgRpcClient *rpcclient.MsgClient) + SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) UnRegister(c *Client) Compressor Encoder @@ -51,8 +52,8 @@ type WsServer struct { MessageHandler } -func (ws *WsServer) SetMessageHandler(rpcClient *rpcclient.MsgClient) { - ws.MessageHandler = NewGrpcHandler(ws.validate, rpcClient) +func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) { + ws.MessageHandler = NewGrpcHandler(ws.validate, client) } func (ws *WsServer) UnRegister(c *Client) { diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 48ac22e7f..ef082d763 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -2,8 +2,6 @@ package msgtransfer import ( "context" - "encoding/json" - "errors" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" @@ -55,36 +53,47 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont } for _, v := range msgFromMQ.MsgData { switch v.ContentType { - case constant.MsgRevokeNotification: - var elem sdkws.NotificationElem - if err := json.Unmarshal(v.Content, &elem); err != nil { - log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content)) - continue - } - var tips sdkws.RevokeMsgTips - if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content)) - continue - } - msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq}) + case constant.DeleteMessageNotification: + deleteMessageTips := sdkws.DeleteMessageTips{} + err := proto.Unmarshal(v.Content, &deleteMessageTips) if err != nil { - log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq) + log.ZError(ctx, "tips unmarshal err:", err, "msg", msg) continue } - if len(msgs) == 0 { - log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq) - continue - } - msgs[0].Content = []byte(elem.Detail) - data, err := proto.Marshal(msgs[0]) - if err != nil { - log.ZError(ctx, "proto.Marshal MsgData", err) - continue - } - if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil { - log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq) + if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { + log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) continue } + //case constant.MsgRevokeNotification: + // var elem sdkws.NotificationElem + // if err := json.Unmarshal(v.Content, &elem); err != nil { + // log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content)) + // continue + // } + // var tips sdkws.RevokeMsgTips + // if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { + // log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content)) + // continue + // } + // msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq}) + // if err != nil { + // log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq) + // continue + // } + // if len(msgs) == 0 { + // log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq) + // continue + // } + // msgs[0].Content = []byte(elem.Detail) + // data, err := proto.Marshal(msgs[0]) + // if err != nil { + // log.ZError(ctx, "proto.Marshal MsgData", err) + // continue + // } + // if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil { + // log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq) + // continue + // } } } } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index f0a510212..6d9064f05 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -5,11 +5,13 @@ import ( "encoding/json" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "time" ) func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { @@ -22,9 +24,16 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if req.RecvID != "" && req.GroupID != "" { return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time") } + if req.Seq < 0 { + return nil, errs.ErrArgs.Wrap("seq is invalid") + } if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil { return nil, err } + user, err := m.User.GetUserInfo(ctx, req.UserID) + if err != nil { + return nil, err + } var sessionType int32 var conversationID string if req.GroupID == "" { @@ -34,6 +43,46 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. sessionType = constant.SuperGroupChatType conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID) } + msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, conversationID, []int64{req.Seq}) + if err != nil { + return nil, err + } + if len(msgs) == 0 { + return nil, errs.ErrRecordNotFound.Wrap("msg not found") + } + sendID := msgs[0].SendID + if !tokenverify.IsAppManagerUid(ctx) { + if req.GroupID == "" { + if req.UserID != sendID { + return nil, errs.ErrNoPermission.Wrap("no permission") + } + } else { + members, err := m.Group.GetGroupMemberInfoMap(ctx, req.GroupID, utils.Distinct([]string{req.UserID, sendID}), true) + if err != nil { + return nil, err + } + if req.UserID != sendID { + roleLevel := members[req.UserID].RoleLevel + switch members[req.UserID].RoleLevel { + case constant.GroupOwner: + case constant.GroupAdmin: + if roleLevel != constant.GroupOrdinaryUsers { + return nil, errs.ErrNoPermission.Wrap("no permission") + } + default: + return nil, errs.ErrNoPermission.Wrap("no permission") + } + } + } + } + err = m.MsgDatabase.RevokeMsg(ctx, conversationID, req.Seq, &unRelationTb.RevokeModel{ + UserID: req.UserID, + Nickname: user.Nickname, + Time: time.Now().UnixMilli(), + }) + if err != nil { + return nil, err + } tips := sdkws.RevokeMsgTips{ RevokerUserID: req.UserID, ClientMsgID: "", diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 3885d2bd1..eb243cb47 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -27,11 +27,18 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) +const ( + updateKeyMsg = iota + updateKeyRevoke + updateKeyDel + updateKeyRead +) + type CommonMsgDatabase interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error // 撤回消息 - RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error + RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error // 刪除redis中消息缓存 DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error // incrSeq然后批量插入缓存 @@ -140,14 +147,33 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation return nil } -func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, msgList []*unRelationTb.MsgInfoModel, firstSeq int64) error { - if len(msgList) == 0 { +func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { + if len(fields) == 0 { return nil } num := db.msg.GetSingleGocMsgNum() - num = 100 - if msgList[0].Msg != nil { - firstSeq = msgList[0].Msg.Seq + //num = 100 + for i, field := range fields { // 检查类型 + var ok bool + switch key { + case updateKeyMsg: + var msg *unRelationTb.MsgDataModel + msg, ok = field.(*unRelationTb.MsgDataModel) + if msg != nil && msg.Seq != firstSeq+int64(i) { + return errs.ErrInternalServer.Wrap("seq is invalid") + } + case updateKeyRevoke: + _, ok = field.(*unRelationTb.RevokeModel) + case updateKeyDel: + _, ok = field.([]string) + case updateKeyRead: + _, ok = field.([]string) + default: + return errs.ErrInternalServer.Wrap("key is invalid") + } + if !ok { + return errs.ErrInternalServer.Wrap("field type is invalid") + } } getDocID := func(seq int64) string { return conversationID + ":" + strconv.FormatInt(seq/num, 10) @@ -156,21 +182,23 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return seq % num } // 返回值为true表示数据库存在该文档,false表示数据库不存在该文档 - updateMsgModel := func(docID string, index int64, msg *unRelationTb.MsgInfoModel) (bool, error) { + updateMsgModel := func(seq int64, i int) (bool, error) { var ( res *mongo.UpdateResult err error ) - if msg.Msg != nil { - res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msg.Msg) - } else if msg.Revoke != nil { - res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke) - } else if msg.DelList != nil { - res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList) - // } else if msg.ReadList != nil { - // res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList) - } else { - return false, errs.ErrArgs.Wrap("msg all field is nil") + docID := getDocID(seq) + index := getIndex(seq) + field := fields[i] + switch key { + case updateKeyMsg: + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) + case updateKeyRevoke: + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) + case updateKeyDel: + res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", field) + case updateKeyRead: + res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", field) } if err != nil { return false, err @@ -178,33 +206,52 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return res.MatchedCount > 0, nil } tryUpdate := true - for i := 0; i < len(msgList); i++ { - msg := msgList[i] - seq := firstSeq + int64(i) - docID := getDocID(seq) + for i := 0; i < len(fields); i++ { + seq := firstSeq + int64(i) // 当前seq if tryUpdate { - matched, err := updateMsgModel(docID, getIndex(seq), msg) + matched, err := updateMsgModel(seq, i) if err != nil { return err } if matched { - continue + continue // 匹配到了,继续下一个(不一定修改) } } doc := unRelationTb.MsgDocModel{ - DocID: docID, + DocID: getDocID(seq), Msg: make([]*unRelationTb.MsgInfoModel, num), } - var insert int - for j := i; j < len(msgList); j++ { + var insert int // 插入的数量 + for j := i; j < len(fields); j++ { seq = firstSeq + int64(j) - if getDocID(seq) != docID { + if getDocID(seq) != doc.DocID { break } insert++ - doc.Msg[getIndex(seq)] = msgList[j] + switch key { + case updateKeyMsg: + doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ + Msg: fields[j].(*unRelationTb.MsgDataModel), + } + case updateKeyRevoke: + doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ + Revoke: fields[j].(*unRelationTb.RevokeModel), + } + case updateKeyDel: + doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ + DelList: fields[j].([]string), + } + case updateKeyRead: + doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ + ReadList: fields[j].([]string), + } + } } for i, model := range doc.Msg { + if model == nil { + model = &unRelationTb.MsgInfoModel{} + doc.Msg[i] = model + } if model.DelList == nil { doc.Msg[i].DelList = []string{} } @@ -214,111 +261,63 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { - i-- - tryUpdate = true + i-- // 存在并发,重试当前数据 + tryUpdate = true // 以修改模式 continue } return err } - tryUpdate = false - i += insert - 1 + tryUpdate = false // 当前以插入成功,下一块优先插入模式 + i += insert - 1 // 跳过已插入的数据 } return nil } func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { - //num := db.msg.GetSingleGocMsgNum() - //currentIndex := currentMaxSeq / num - //var blockMsgs []*[]*sdkws.MsgData - //for i, data := range msgList { - // data.Seq = currentMaxSeq + int64(i+1) - // index := data.Seq/num - currentIndex - // if i == 0 && index == 1 { - // index-- - // currentIndex++ - // } - // var block *[]*sdkws.MsgData - // if len(blockMsgs) == int(index) { - // var size int64 - // if i == 0 { - // size = num - data.Seq%num - // } else { - // temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num - // if temp >= num { - // size = num - // } else { - // size = temp % num - // } - // } - // temp := make([]*sdkws.MsgData, 0, size) - // block = &temp - // blockMsgs = append(blockMsgs, block) - // } else { - // block = blockMsgs[index] - // } - // *block = append(*block, msgList[i]) - //} - //create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0) - //if !create { - // exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex)) - // if err != nil { - // return err - // } - // create = !exist - //} - //for i, msgs := range blockMsgs { - // docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i)) - // if create || i != 0 { // 插入 - // doc := unRelationTb.MsgDocModel{ - // DocID: docID, - // Msg: make([]unRelationTb.MsgInfoModel, num), - // } - // for i := 0; i < len(doc.Msg); i++ { - // doc.Msg[i].ReadList = []string{} - // doc.Msg[i].DelList = []string{} - // } - // for _, msg := range *msgs { - // data, err := proto.Marshal(msg) - // if err != nil { - // return err - // } - // doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{ - // SendTime: msg.SendTime, - // Msg: data, - // ReadList: []string{}, - // DelList: []string{}, - // } - // } - // if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { - // prome.Inc(prome.MsgInsertMongoFailedCounter) - // return utils.Wrap(err, "") - // } - // prome.Inc(prome.MsgInsertMongoSuccessCounter) - // } else { // 修改 - // for _, msg := range *msgs { - // data, err := proto.Marshal(msg) - // if err != nil { - // return err - // } - // info := unRelationTb.MsgInfoModel{ - // SendTime: msg.SendTime, - // Msg: data, - // } - // if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil { - // prome.Inc(prome.MsgInsertMongoFailedCounter) - // return err - // } - // prome.Inc(prome.MsgInsertMongoSuccessCounter) - // } - // } - //} - return nil + msgs := make([]any, len(msgList)) + for i, msg := range msgList { + if msg == nil { + continue + } + var offlinePushModel *unRelationTb.OfflinePushModel + if msg.OfflinePushInfo != nil { + offlinePushModel = &unRelationTb.OfflinePushModel{ + Title: msg.OfflinePushInfo.Title, + Desc: msg.OfflinePushInfo.Desc, + Ex: msg.OfflinePushInfo.Ex, + IOSPushSound: msg.OfflinePushInfo.IOSPushSound, + IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, + } + } + msgs[i] = &unRelationTb.MsgDataModel{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + ClientMsgID: msg.ClientMsgID, + ServerMsgID: msg.ServerMsgID, + SenderPlatformID: msg.SenderPlatformID, + SenderNickname: msg.SenderNickname, + SenderFaceURL: msg.SenderFaceURL, + SessionType: msg.SessionType, + MsgFrom: msg.MsgFrom, + ContentType: msg.ContentType, + Content: string(msg.Content), + Seq: msg.Seq, + SendTime: msg.SendTime, + CreateTime: msg.CreateTime, + Status: msg.Status, + Options: msg.Options, + OfflinePush: offlinePushModel, + AtUserIDList: msg.AtUserIDList, + AttachedInfo: msg.AttachedInfo, + Ex: msg.Ex, + } + } + return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, currentMaxSeq-int64(len(msgList))) } -func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error { - index := seq / db.msg.GetSingleGocMsgNum() - docID := db.msg.IndexDocID(conversationID, index) - return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg) +func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error { + return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) } func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index e557176a3..6de3b4031 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -197,3 +197,38 @@ func Test_Delete(t *testing.T) { t.Fatal(err) } } + +func Test_Delete1(t *testing.T) { + config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} + config.Config.Mongo.DBTimeout = 60 + config.Config.Mongo.DBDatabase = "openIM" + config.Config.Mongo.DBSource = "admin" + config.Config.Mongo.DBUserName = "root" + config.Config.Mongo.DBPassword = "openIM123" + config.Config.Mongo.DBMaxPoolSize = 100 + config.Config.Mongo.DBRetainChatRecords = 3650 + config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" + + mongo, err := unrelation.NewMongo() + if err != nil { + panic(err) + } + err = mongo.GetDatabase().Client().Ping(context.Background(), nil) + if err != nil { + panic(err) + } + + c := mongo.GetClient().Database("openIM").Collection("msg") + + var o unRelationTb.MsgDocModel + + err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o) + if err != nil { + panic(err) + } + + for i, model := range o.Msg { + fmt.Println(i, model == nil) + } + +} diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go new file mode 100644 index 000000000..9904c8d2d --- /dev/null +++ b/pkg/rpcclient/push.go @@ -0,0 +1,33 @@ +package rpcclient + +import ( + "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" +) + +type PushClient struct { + MetaClient +} + +func NewPushClient(client discoveryregistry.SvcDiscoveryRegistry) *PushClient { + return &PushClient{ + MetaClient: MetaClient{ + client: client, + rpcRegisterName: config.Config.RpcRegisterName.OpenImPushName, + }, + } +} + +func (p *PushClient) DelUserPushToken(ctx context.Context, req push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { + cc, err := p.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := push.NewPushMsgServiceClient(cc).DelUserPushToken(ctx, &req) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/pkg/utils/utils_v2.go b/pkg/utils/utils_v2.go index 9143b0685..777308fd7 100644 --- a/pkg/utils/utils_v2.go +++ b/pkg/utils/utils_v2.go @@ -80,6 +80,15 @@ func DistinctAnyGetComparable[E any, K comparable](es []E, fn func(e E) K) []K { // Distinct 去重 func Distinct[T comparable](ts []T) []T { + if len(ts) < 2 { + return ts + } else if len(ts) == 2 { + if ts[0] == ts[1] { + return ts[:1] + } else { + return ts + } + } return DistinctAny(ts, func(t T) T { return t })