diff --git a/internal/rpc/admincms/admin_cms.go b/internal/rpc/admincms/admin_cms.go index 00043ae73..d71d3fa36 100644 --- a/internal/rpc/admincms/admin_cms.go +++ b/internal/rpc/admincms/admin_cms.go @@ -10,8 +10,9 @@ import ( promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" - pbAdminCMS "Open_IM/pkg/proto/admin_cms" - common "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/proto/admincms" + "Open_IM/pkg/proto/sdkws" + "github.com/OpenIMSDK/openKeeper" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -20,7 +21,6 @@ import ( "errors" "net" "strconv" - "strings" "sync" "time" @@ -37,6 +37,8 @@ type adminCMSServer struct { groupInterface controller.GroupInterface userInterface controller.UserInterface chatLogInterface controller.ChatLogInterface + + AuthInterface controller.AuthController } func NewAdminCMSServer(port int) *adminCMSServer { @@ -88,20 +90,12 @@ func (s *adminCMSServer) Run() { srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd - pbAdminCMS.RegisterAdminCMSServer(srv, s) - rpcRegisterIP := config.Config.RpcRegisterIP - if config.Config.RpcRegisterIP == "" { - rpcRegisterIP, err = utils.GetLocalIP() - if err != nil { - log.Error("", "GetLocalIP failed ", err.Error()) - } - } - log.NewInfo("", "rpcRegisterIP ", rpcRegisterIP) - err = rpc.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10, "") + admincms.RegisterAdminCMSServer(srv, s) + zkClient, err := openKeeper.NewClient(config.Config) if err != nil { - log.NewError("0", "RegisterEtcd failed ", err.Error()) - panic(utils.Wrap(err, "register admin module rpc to etcd err")) + panic(err.Error()) } + err = srv.Serve(listener) if err != nil { log.NewError("0", "Serve failed ", err.Error()) @@ -110,11 +104,11 @@ func (s *adminCMSServer) Run() { log.NewInfo("0", "message cms rpc success") } -func (s *adminCMSServer) AdminLogin(ctx context.Context, req *pbAdminCMS.AdminLoginReq) (*pbAdminCMS.AdminLoginResp, error) { - resp := &pbAdminCMS.AdminLoginResp{} +func (s *adminCMSServer) AdminLogin(ctx context.Context, req *admincms.AdminLoginReq) (*admincms.AdminLoginResp, error) { + resp := &admincms.AdminLoginResp{} for i, adminID := range config.Config.Manager.AppManagerUid { if adminID == req.AdminID && config.Config.Manager.Secrets[i] == req.Secret { - token, expTime, err := tokenverify.CreateToken(adminID, constant.LinuxPlatformID) + token, err := s.AuthInterface.CreateToken(ctx, adminID, constant.LinuxPlatformID) if err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "generate token failed", "adminID: ", adminID, err.Error()) return nil, err @@ -147,7 +141,7 @@ func (s *adminCMSServer) GetUserToken(ctx context.Context, req *pbAdminCMS.GetUs } func (s *adminCMSServer) GetChatLogs(ctx context.Context, req *pbAdminCMS.GetChatLogsReq) (*pbAdminCMS.GetChatLogsResp, error) { - chatLog := relation.ChatLog{ + chatLog := relationTb.ChatLog{ Content: req.Content, ContentType: req.ContentType, SessionType: req.SessionType, diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index b437e8702..feb1685cc 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -21,7 +21,6 @@ func callbackBeforeCreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) if !config.Config.Callback.CallbackBeforeCreateGroup.Enable { return nil } - log.NewDebug(req.OperationID, utils.GetSelfFuncName(), req.String()) commonCallbackReq := &cbApi.CallbackBeforeCreateGroupReq{ CallbackCommand: constant.CallbackBeforeCreateGroupCommand, OperationID: req.OperationID, diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 776b86b0f..ae427442a 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -8,10 +8,12 @@ import ( type MsgInterface interface { BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64) + DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) - DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error + // logic delete + DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error) - ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) + ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error) } type MsgDatabaseInterface interface { diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 22a8553b2..37ea34161 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -30,11 +30,11 @@ type UserController struct { } // 获取指定用户的信息 如有userID未找到 也返回错误 -func (u *UserController) FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { +func (u *UserController) FindWithError(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) { return u.database.FindWithError(ctx, userIDs) } -func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { +func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) { return u.database.Find(ctx, userIDs) } func (u *UserController) Create(ctx context.Context, users []*relationTb.UserModel) error { @@ -90,7 +90,7 @@ func newUserDatabase(db *gorm.DB) *UserDatabase { } // 获取指定用户的信息 如有userID未找到 也返回错误 -func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { +func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) { users, err = u.user.Find(ctx, userIDs) if err != nil { return @@ -102,7 +102,7 @@ func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (use } // 获取指定用户的信息 如有userID未找到 不返回错误 -func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { +func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error) { users, err = u.user.Find(ctx, userIDs) return } @@ -123,7 +123,7 @@ func (u *UserDatabase) UpdateByMap(ctx context.Context, userID string, args map[ } // 获取,如果没找到,不返回错误 -func (u *UserDatabase) Page(ctx context.Context, showNumber, pageNumber int32) (users []*relation2.UserModel, count int64, err error) { +func (u *UserDatabase) Page(ctx context.Context, showNumber, pageNumber int32) (users []*relationTb.UserModel, count int64, err error) { return u.user.Page(ctx, showNumber, pageNumber) } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 0fee6fc26..108e78898 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -49,6 +49,20 @@ func (u UserMsgDocModel) GetSeqUid(uid string, seq uint32) string { return u.getSeqUid(uid, seq) } +func (u UserMsgDocModel) GetDocIDSeqsMap(uid string, seqs []uint32) map[string][]uint32 { + t := make(map[string][]uint32) + for i := 0; i < len(seqs); i++ { + seqUid := u.getSeqUid(uid, seqs[i]) + if value, ok := t[seqUid]; !ok { + var temp []uint32 + t[seqUid] = append(temp, seqs[i]) + } else { + t[seqUid] = append(value, seqs[i]) + } + } + return t +} + func (UserMsgDocModel) getMsgIndex(seq uint32) int { seqSuffix := seq / singleGocMsgNum var index uint32 diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 677d00798..d3a3c29af 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -209,10 +209,11 @@ func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID return seqMsg, nil } -func (d *db.DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) { +// model +func (d *db.DataBases) GetUserMsgListByIndex(docID string, index int64) (*UserChat, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) - regex := fmt.Sprintf("^%s", ID) + regex := fmt.Sprintf("^%s", docID) findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"uid": 1}) var msgs []UserChat //primitive.Regex{Pattern: regex} @@ -231,6 +232,7 @@ func (d *db.DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, } } +// model func (d *db.DataBases) DelMongoMsgs(IDList []string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -238,6 +240,7 @@ func (d *db.DataBases) DelMongoMsgs(IDList []string) error { return err } +// model func (d *db.DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)