diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 043398422..3ccfb4819 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -90,7 +90,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs msgs, err := db.DB.GetUserMsgListByIndex(ID, index) if err != nil || msgs.UID == "" { if err != nil { - if err == mongo.ErrMsgListNotExist { + if err == mongoDB.ErrMsgListNotExist { log.NewInfo(operationID, utils.GetSelfFuncName(), "ID:", ID, "index:", index, err.Error()) } else { log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID) @@ -104,7 +104,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return delStruct.getSetMinSeq() + 1, nil } log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) - if len(msgs.Msg) > mongo.GetSingleGocMsgNum() { + if len(msgs.Msg) > mongoDB.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { @@ -150,7 +150,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return seq, utils.Wrap(err, "deleteMongoMsg failed") } -func msgListIsFull(chat *mongo.UserChat) bool { +func msgListIsFull(chat *mongoDB.UserChat) bool { index, _ := strconv.Atoi(strings.Split(chat.UID, ":")[1]) if index == 0 { if len(chat.Msg) >= 4999 { diff --git a/internal/msg_transfer/logic/modify_msg_handler.go b/internal/msg_transfer/logic/modify_msg_handler.go index 847bf8761..4cdff43fc 100644 --- a/internal/msg_transfer/logic/modify_msg_handler.go +++ b/internal/msg_transfer/logic/modify_msg_handler.go @@ -71,14 +71,14 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg } if !notification.IsReact { // first time to modify - var reactionExtensionList = make(map[string]mongo.KeyValue) - extendMsg := mongo.ExtendMsg{ + var reactionExtensionList = make(map[string]mongoDB.KeyValue) + extendMsg := mongoDB.ExtendMsg{ ReactionExtensionList: reactionExtensionList, ClientMsgID: notification.ClientMsgID, MsgFirstModifyTime: notification.MsgFirstModifyTime, } for _, v := range notification.SuccessReactionExtensionList { - reactionExtensionList[v.TypeKey] = mongo.KeyValue{ + reactionExtensionList[v.TypeKey] = mongoDB.KeyValue{ TypeKey: v.TypeKey, Value: v.Value, LatestUpdateTime: v.LatestUpdateTime, diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index d3bfdc694..df14f5b7d 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -4,7 +4,7 @@ import ( cbApi "Open_IM/pkg/callback_struct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + imdb "Open_IM/pkg/common/db/mysql" "Open_IM/pkg/common/http" "Open_IM/pkg/common/log" "Open_IM/pkg/common/trace_log" @@ -77,7 +77,7 @@ func callbackBeforeCreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) return err } -func CallbackBeforeMemberJoinGroup(ctx context.Context, operationID string, groupMember *im_mysql_model.GroupMember, groupEx string) (err error) { +func CallbackBeforeMemberJoinGroup(ctx context.Context, operationID string, groupMember *imdb.GroupMember, groupEx string) (err error) { defer func() { trace_log.SetCtxInfo(ctx, utils.GetFuncName(1), err, "groupMember", *groupMember, "groupEx", groupEx) }() diff --git a/internal/rpc/group/g.go b/internal/rpc/group/g.go index 55c0b838f..73cb06bbd 100644 --- a/internal/rpc/group/g.go +++ b/internal/rpc/group/g.go @@ -2,7 +2,7 @@ package group import ( "Open_IM/pkg/common/constant" - imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + imdb "Open_IM/pkg/common/db/mysql" "Open_IM/pkg/common/tools" pbGroup "Open_IM/pkg/proto/group" sdk "Open_IM/pkg/proto/sdk_ws" diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 0109743f4..2bdb3ea56 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -5,16 +5,16 @@ import ( chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - rocksCache "Open_IM/pkg/common/db/rocks_cache" + "Open_IM/pkg/common/db/model" + imdb "Open_IM/pkg/common/db/mysql" "Open_IM/pkg/common/log" "Open_IM/pkg/common/middleware" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tools" "Open_IM/pkg/common/trace_log" - cp "Open_IM/pkg/common/utils" + + cp "Open_IM/internal/utils" "Open_IM/pkg/getcdv3" pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" @@ -41,6 +41,8 @@ type groupServer struct { rpcRegisterName string etcdSchema string etcdAddr []string + imdb.GroupInterface + } func NewGroupServer(port int) *groupServer { @@ -131,39 +133,26 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR for _, adminUserID := range req.AdminUserIDs { userIDs = append(userIDs, adminUserID) } - if utils.IsRepeatID(userIDs) { + if utils.IsDuplicateID(userIDs) { return nil, constant.ErrArgs.Wrap("group member is repeated") } - users, err := getUsersInfo(ctx, userIDs) if err != nil { return nil, err } - userMap := make(map[string]*open_im_sdk.UserInfo) for i, user := range users { userMap[user.UserID] = users[i] } - if err := callbackBeforeCreateGroup(ctx, req); err != nil { return nil, err } - groupId := genGroupID(ctx, req.GroupInfo.GroupID) - - groupInfo := imdb.Group{} - utils.CopyStructFields(&groupInfo, req.GroupInfo) - groupInfo.CreatorUserID = tools.OpUserID(ctx) - groupInfo.GroupID = groupId - groupInfo.CreateTime = time.Now() - if groupInfo.NotificationUpdateTime.Unix() < 0 { - groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0) - } - + groupInfo, err := (&cp.PBGroup{req.GroupInfo}).Convert() + groupInfo.GroupID = genGroupID(ctx, req.GroupInfo.GroupID) if req.GroupInfo.GroupType != constant.SuperGroup { - var groupMembers []*imdb.GroupMember joinGroup := func(userID string, roleLevel int32) error { - groupMember := &imdb.GroupMember{GroupID: groupId, RoleLevel: roleLevel, OperatorUserID: tools.OpUserID(ctx), JoinSource: constant.JoinByInvitation, InviterUserID: tools.OpUserID(ctx)} + groupMember := &imdb.GroupMember{GroupID: groupInfo.GroupID, RoleLevel: roleLevel, OperatorUserID: tools.OpUserID(ctx), JoinSource: constant.JoinByInvitation, InviterUserID: tools.OpUserID(ctx)} user := userMap[userID] utils.CopyStructFields(&groupMember, user) if err := CallbackBeforeMemberJoinGroup(ctx, tools.OperationID(ctx), groupMember, groupInfo.Ex); err != nil { @@ -172,17 +161,22 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR groupMembers = append(groupMembers, groupMember) return nil } - if req.OwnerUserID == "" { - if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { - return nil, err - } - } - for _, info := range req.InitMemberList { - if err := joinGroup(info.UserID, info.RoleLevel); err != nil { - return nil, err - } + + if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { + return nil, err } + for _, info := range req.InitMemberList { + if err := joinGroup(info, constant.GroupOrdinaryUsers); err != nil { + return nil, err + } + } + for _, info := range req.AdminUserIDs { + if err := joinGroup(info, constant.GroupAdmin); err != nil { + return nil, err + } + } + if err := model. if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil { return nil, err } diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index ce3938036..583656e70 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -297,7 +297,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR } } - var tagSendLogs mongo.TagSendLog + var tagSendLogs mongoDB.TagSendLog var wg sync.WaitGroup wg.Add(len(successUserIDList)) var lock sync.Mutex @@ -310,7 +310,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR return } lock.Lock() - tagSendLogs.UserList = append(tagSendLogs.UserList, mongo.TagUser{ + tagSendLogs.UserList = append(tagSendLogs.UserList, mongoDB.TagUser{ UserID: userID, UserName: userName, }) @@ -389,10 +389,10 @@ func (s *officeServer) GetUserTagByID(_ context.Context, req *pbOffice.GetUserTa func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.CreateOneWorkMomentReq) (resp *pbOffice.CreateOneWorkMomentResp, err error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp = &pbOffice.CreateOneWorkMomentResp{CommonResp: &pbOffice.CommonResp{}} - workMoment := mongo.WorkMoment{ - Comments: []*mongo.Comment{}, - LikeUserList: []*mongo.WorkMomentUser{}, - PermissionUserList: []*mongo.WorkMomentUser{}, + workMoment := mongoDB.WorkMoment{ + Comments: []*mongoDB.Comment{}, + LikeUserList: []*mongoDB.WorkMomentUser{}, + PermissionUserList: []*mongoDB.WorkMomentUser{}, } createUser, err := imdb.GetUserByUserID(req.WorkMoment.UserID) if err != nil { @@ -406,14 +406,14 @@ func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.Crea workMoment.UserName = createUser.Nickname workMoment.FaceURL = createUser.FaceURL workMoment.PermissionUserIDList = s.getPermissionUserIDList(req.OperationID, req.WorkMoment.PermissionGroupList, req.WorkMoment.PermissionUserList) - workMoment.PermissionUserList = []*mongo.WorkMomentUser{} + workMoment.PermissionUserList = []*mongoDB.WorkMomentUser{} for _, userID := range workMoment.PermissionUserIDList { userName, err := imdb.GetUserNameByUserID(userID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error()) continue } - workMoment.PermissionUserList = append(workMoment.PermissionUserList, &mongo.WorkMomentUser{ + workMoment.PermissionUserList = append(workMoment.PermissionUserList, &mongoDB.WorkMomentUser{ UserID: userID, UserName: userName, }) @@ -503,7 +503,7 @@ func (s *officeServer) DeleteOneWorkMoment(_ context.Context, req *pbOffice.Dele return resp, nil } -func isUserCanSeeWorkMoment(userID string, workMoment mongo.WorkMoment) bool { +func isUserCanSeeWorkMoment(userID string, workMoment mongoDB.WorkMoment) bool { if userID != workMoment.UserID { switch workMoment.Permission { case constant.WorkMomentPublic: @@ -570,7 +570,7 @@ func (s *officeServer) CommentOneWorkMoment(_ context.Context, req *pbOffice.Com return resp, nil } } - comment := &mongo.Comment{ + comment := &mongoDB.Comment{ UserID: req.UserID, UserName: commentUser.Nickname, ReplyUserID: req.ReplyUserID, @@ -644,7 +644,7 @@ func (s *officeServer) GetUserWorkMoments(_ context.Context, req *pbOffice.GetUs log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp = &pbOffice.GetUserWorkMomentsResp{CommonResp: &pbOffice.CommonResp{}, WorkMoments: []*pbOffice.WorkMoment{}} resp.Pagination = &pbCommon.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber} - var workMoments []mongo.WorkMoment + var workMoments []mongoDB.WorkMoment if req.UserID == req.OpUserID { workMoments, err = db.DB.GetUserSelfWorkMoments(req.UserID, req.Pagination.ShowNumber, req.Pagination.PageNumber) } else { diff --git a/internal/utils/convert.go b/internal/utils/convert.go index 3d6ccf549..690c6eedc 100644 --- a/internal/utils/convert.go +++ b/internal/utils/convert.go @@ -1,7 +1,7 @@ package utils import ( - imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + imdb "Open_IM/pkg/common/db/mysql" sdk "Open_IM/pkg/proto/sdk_ws" utils2 "Open_IM/pkg/utils" utils "github.com/OpenIMSDK/open_utils" @@ -120,7 +120,7 @@ type PBGroup struct { *sdk.GroupInfo } -func (pb *PBGroup) convert() (*imdb.Group, error) { +func (pb *PBGroup) Convert() (*imdb.Group, error) { dst := &imdb.Group{} utils.CopyStructFields(dst, pb) return dst, nil @@ -207,44 +207,31 @@ func (db *DBGroupRequest) convert() (*sdk.GroupRequest, error) { } type DBUser struct { - *imdb + *imdb.User } type PBUser struct { *sdk.UserInfo } -func (pb *PBUser) convert() (*DBUser, error) { - dst := &DBUser{} +func (pb *PBUser) convert() (*imdb.User, error) { + dst := &imdb.User{} utils.CopyStructFields(dst, pb) - - utils.CopyStructFields(dst, src) - dst.Birth, _ = utils.TimeStringToTime(src.BirthStr) - dst.CreateTime = utils.UnixSecondToTime(int64(src.CreateTime)) - + dst.Birth = utils.UnixSecondToTime(pb.Birthday) + dst.CreateTime = utils.UnixSecondToTime(int64(pb.CreateTime)) return dst, nil } -func (db *DBUser) convert() (*PBUser, error) { - dst := &sdk.GroupRequest{} + +func (db *DBUser) convert() (*sdk.UserInfo, error) { + dst := &sdk.UserInfo{} utils.CopyStructFields(dst, db) - dst.ReqTime = uint32(db.ReqTime.Unix()) - dst.HandleTime = uint32(db.HandledTime.Unix()) + dst.CreateTime = uint32(db.CreateTime.Unix()) + dst.Birthday = db.Birth.Unix() return dst, nil } -func UserOpenIMCopyDB(dst *imdb.User, src *sdk.UserInfo) { - utils.CopyStructFields(dst, src) - dst.Birth, _ = utils.TimeStringToTime(src.BirthStr) - dst.CreateTime = utils.UnixSecondToTime(int64(src.CreateTime)) -} - -func UserDBCopyOpenIM(dst *open_im_sdk.UserInfo, src *imdb.User) { - utils.CopyStructFields(dst, src) - dst.CreateTime = uint32(src.CreateTime.Unix()) - //dst.Birth = uint32(src.Birth.Unix()) - dst.BirthStr = utils2.TimeToString(src.Birth) -} - -func UserDBCopyOpenIMPublicUser(dst *open_im_sdk.PublicUserInfo, src *imdb.User) { - utils.CopyStructFields(dst, src) +func (db *DBUser) convertPublic() (*sdk.PublicUserInfo, error) { + dst := &sdk.PublicUserInfo{} + utils.CopyStructFields(dst, db) + return dst, nil } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index ec439ebcd..04f4837b4 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -15,14 +15,22 @@ const GroupExpireTime = time.Second * 60 * 60 * 12 const groupInfoCacheKey = "GROUP_INFO_CACHE:" type GroupCache struct { - Client *Client - db *mysql.Group - expireTime time.Duration + db mysql.GroupModelInterface + expireTime time.Duration + redisClient *RedisClient + rcClient *rockscache.Client } -func NewGroupRc(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) GroupCache { - rcClient := newClient(rdb, opts) - return GroupCache{Client: rcClient, expireTime: GroupExpireTime} +func NewGroupCache(rdb redis.UniversalClient, db mysql.GroupModelInterface, opts rockscache.Options) *GroupCache { + rcClient := &rockscache.Client{ + Options: rockscache.Options{}, + } + redisClient := NewRedisClient(rdb) + return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient} +} + +func (g *GroupCache) getRedisClient() *RedisClient { + return g.redisClient } func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { @@ -52,7 +60,7 @@ func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string) defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group) }() - groupStr, err := g.Client.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup) + groupStr, err := g.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup) if err != nil { return nil, utils.Wrap(err, "") } @@ -64,7 +72,16 @@ func (g *GroupCache) DelGroupInfoFromCache(ctx context.Context, groupID string) defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) }() - return g.Client.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID)) + return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID)) +} + +func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []string) error { + for _, groupID := range groupIDs { + if err := g.DelGroupInfoFromCache(ctx, groupID); err != nil { + return err + } + } + return nil } func (g *GroupCache) getGroupInfoCacheKey(groupID string) string { diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 649bf5a3d..ddb568e23 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -42,9 +42,10 @@ const ( exTypeKeyLocker = "EX_LOCK:" ) -func InitRedis(ctx context.Context) go_redis.UniversalClient { +func InitRedis() go_redis.UniversalClient { var rdb go_redis.UniversalClient var err error + ctx := context.Background() if config.Config.Redis.EnableCluster { rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ Addrs: config.Config.Redis.DBAddress, @@ -73,14 +74,14 @@ func InitRedis(ctx context.Context) go_redis.UniversalClient { return rdb } -func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient { - return &RedisClient{rdb: rdb} -} - type RedisClient struct { rdb go_redis.UniversalClient } +func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient { + return &RedisClient{rdb: rdb} +} + func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) { key := accountTempCode + account n, err := r.rdb.Exists(context.Background(), key).Result() diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 2e5babf1b..52dd29480 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -11,6 +11,7 @@ import ( "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" + "gorm.io/gorm" "math/big" "sort" "strconv" @@ -18,11 +19,11 @@ import ( ) const ( - userInfoCache = "USER_INFO_CACHE:" - friendRelationCache = "FRIEND_RELATION_CACHE:" - blackListCache = "BLACK_LIST_CACHE:" - groupCache = "GROUP_CACHE:" - groupInfoCache = "GROUP_INFO_CACHE:" + userInfoCache = "USER_INFO_CACHE:" + friendRelationCache = "FRIEND_RELATION_CACHE:" + blackListCache = "BLACK_LIST_CACHE:" + groupCache = "GROUP_CACHE:" + //groupInfoCache = "GROUP_INFO_CACHE:" groupOwnerIDCache = "GROUP_OWNER_ID:" joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" @@ -38,19 +39,10 @@ const ( ) const scanCount = 3000 - -type RcClient struct { - rdb redis.UniversalClient - Cache *rockscache.Client - ExpireTime time.Duration -} - -func NewRcClient(rdb redis.UniversalClient, expireTime time.Duration, opts rockscache.Options) *RcClient { - return &RcClient{Cache: rockscache.NewClient(rdb, opts), ExpireTime: expireTime} -} +const RandomExpireAdjustment = 0.2 func (rc *RcClient) DelKeys() { - for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache, + for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCacheKey, groupOwnerIDCache, joinedGroupListCache, groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} { fName := utils.GetSelfFuncName() var cursor uint64 @@ -80,7 +72,7 @@ func (rc *RcClient) DelKeys() { } } -func (rc *RcClient) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { +func (rc *Client) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { getFriendIDList := func() (string, error) { friendIDList, err := mysql.GetFriendIDListByUserID(userID) if err != nil { @@ -364,36 +356,36 @@ func DelAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (err e return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID) } -func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { - getGroupInfo := func() (string, error) { - groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) - if err != nil { - return "", utils.Wrap(err, "") - } - bytes, err := json.Marshal(groupInfo) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - groupInfo = &mysql.Group{} - defer func() { - trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) - }() - groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = json.Unmarshal([]byte(groupInfoStr), groupInfo) - return groupInfo, utils.Wrap(err, "") -} - -func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) { - defer func() { - trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) - }() - return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID) -} +//func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { +// getGroupInfo := func() (string, error) { +// groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) +// if err != nil { +// return "", utils.Wrap(err, "") +// } +// bytes, err := json.Marshal(groupInfo) +// if err != nil { +// return "", utils.Wrap(err, "") +// } +// return string(bytes), nil +// } +// groupInfo = &mysql.Group{} +// defer func() { +// trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) +// }() +// groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo) +// if err != nil { +// return nil, utils.Wrap(err, "") +// } +// err = json.Unmarshal([]byte(groupInfoStr), groupInfo) +// return groupInfo, utils.Wrap(err, "") +//} +// +//func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) { +// defer func() { +// trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) +// }() +// return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID) +//} func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) { getAllFriendInfo := func() (string, error) { @@ -616,7 +608,7 @@ func DelConversationFromCache(ctx context.Context, ownerUserID, conversationID s return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") } -func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongo.ExtendMsg, err error) { +func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongoDB.ExtendMsg, err error) { getExtendMsg := func() (string, error) { extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime) if err != nil { @@ -636,7 +628,7 @@ func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clien if err != nil { return nil, utils.Wrap(err, "Fetch failed") } - extendMsg = &mongo.ExtendMsg{} + extendMsg = &mongoDB.ExtendMsg{} err = json.Unmarshal([]byte(extendMsgStr), extendMsg) return extendMsg, utils.Wrap(err, "Unmarshal failed") } diff --git a/pkg/common/db/model/group.go b/pkg/common/db/model/group.go index dd878ad98..c1c2a1d69 100644 --- a/pkg/common/db/model/group.go +++ b/pkg/common/db/model/group.go @@ -2,42 +2,42 @@ package model import ( "Open_IM/pkg/common/db/cache" - "Open_IM/pkg/common/db/mongo" + "Open_IM/pkg/common/db/mongoDB" "Open_IM/pkg/common/db/mysql" - "Open_IM/pkg/common/trace_log" - "Open_IM/pkg/utils" "context" - "encoding/json" - //"github.com/dtm-labs/rockscache" + "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/mongo" "gorm.io/gorm" //"time" ) -type GroupModel struct { - db *mysql.Group - cache *cache.GroupCache - mongo *mongo.Client +type GroupInterface interface { + Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) + Create(ctx context.Context, groups []*mysql.Group) error + Delete(ctx context.Context, groupIDs []string) error + Take(ctx context.Context, groupID string) (group *mysql.Group, err error) } +type GroupModel struct { + db mysql.GroupModelInterface + cache *cache.GroupCache + mongo *mongoDB.Client +} -func NewGroupModel() { +func NewGroupModel(db mysql.GroupModelInterface, rdb redis.UniversalClient, mdb *mongo.Client) *GroupModel { var groupModel GroupModel - redisClient := cache.InitRedis() - rdb := cache.NewRedisClient(redisClient) - groupModel.db = mysql.NewGroupDB() - //mgo := mongo.In() + groupModel.db = db + groupModel.cache = cache.NewGroupCache(rdb, db, rockscache.Options{ + DisableCacheRead: false, + StrongConsistency: true, + }) + groupModel.mongo = mongoDB.NewMongoClient(mdb) + return &groupModel } func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { - g.cache.Client. - for _, groupID := range groupIDs { - group, err := g.getGroupInfoFromCache(ctx, groupID) - if err != nil { - return nil, err - } - groups = append(groups, group) - } - return groups, nil + return g.cache.GetGroupsInfoFromCache(ctx, groupIDs) } func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error { @@ -49,7 +49,7 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { if err := g.db.Delete(ctx, groupIDs, tx); err != nil { return err } - if err := g.deleteGroupsInCache(ctx, groupIDs); err != nil { + if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil { return err } return nil @@ -57,35 +57,6 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { return err } -func (g *GroupModel) deleteGroupsInCache(ctx context.Context, groupIDs []string) error { - for _, groupID := range groupIDs { - if err := g.weakRc.Cache.TagAsDeleted(g.getGroupCacheKey(groupID)); err != nil { - return err - } - } - return nil -} - -func (g *GroupModel) getGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { - getGroupInfo := func() (string, error) { - groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) - if err != nil { - return "", utils.Wrap(err, "") - } - bytes, err := json.Marshal(groupInfo) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - groupInfo = &mysql.Group{} - defer func() { - trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) - }() - groupInfoStr, err := g.weakRc.Cache.Fetch(groupInfoCache+groupID, GroupExpireTime, getGroupInfo) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = json.Unmarshal([]byte(groupInfoStr), groupInfo) - return groupInfo, utils.Wrap(err, "") +func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) { + return g.cache.GetGroupInfoFromCache(ctx, groupID) } diff --git a/pkg/common/db/model/user.go b/pkg/common/db/model/user.go index 8a88b99cf..26ea4b2a3 100644 --- a/pkg/common/db/model/user.go +++ b/pkg/common/db/model/user.go @@ -9,9 +9,10 @@ type UserModel struct { db *mysql.User } -func NewGroupUser(ctx context.Context) { +func NewGroupUser(ctx context.Context) *UserModel { var userModel UserModel userModel.db = mysql.NewUserDB() + return &userModel } func (u *UserModel) Find(ctx context.Context, userIDs []string) (users []*mysql.User, err error) { diff --git a/pkg/common/db/mongo/office.go b/pkg/common/db/mongo/office.go deleted file mode 100644 index af2a1bf5b..000000000 --- a/pkg/common/db/mongo/office.go +++ /dev/null @@ -1 +0,0 @@ -package mongo diff --git a/pkg/common/db/mongo/batch_insert_chat.go b/pkg/common/db/mongoDB/batch_insert_chat.go similarity index 99% rename from pkg/common/db/mongo/batch_insert_chat.go rename to pkg/common/db/mongoDB/batch_insert_chat.go index e81e3e800..c2e5ca33e 100644 --- a/pkg/common/db/mongo/batch_insert_chat.go +++ b/pkg/common/db/mongoDB/batch_insert_chat.go @@ -1,4 +1,4 @@ -package mongo +package mongoDB import ( "Open_IM/pkg/common/config" diff --git a/pkg/common/db/mongo/extend_msg_mongo_model.go b/pkg/common/db/mongoDB/extend_msg_mongo_model.go similarity index 99% rename from pkg/common/db/mongo/extend_msg_mongo_model.go rename to pkg/common/db/mongoDB/extend_msg_mongo_model.go index 582c43f96..1ca56bc59 100644 --- a/pkg/common/db/mongo/extend_msg_mongo_model.go +++ b/pkg/common/db/mongoDB/extend_msg_mongo_model.go @@ -1,4 +1,4 @@ -package mongo +package mongoDB import ( "Open_IM/pkg/common/config" diff --git a/pkg/common/db/mongo/init_mongo.go b/pkg/common/db/mongoDB/init_mongo.go similarity index 93% rename from pkg/common/db/mongo/init_mongo.go rename to pkg/common/db/mongoDB/init_mongo.go index b61991506..9333e75e9 100644 --- a/pkg/common/db/mongo/init_mongo.go +++ b/pkg/common/db/mongoDB/init_mongo.go @@ -1,4 +1,4 @@ -package mongo +package mongoDB import ( "Open_IM/pkg/common/config" @@ -13,7 +13,15 @@ import ( "time" ) -func InitMongoClient() *mongo.Client { +type Client struct { + mongo *mongo.Client +} + +func NewMongoClient(mdb *mongo.Client) *Client { + return &Client{mongo: mdb} +} + +func initMongo() *mongo.Client { uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" if config.Config.Mongo.DBUri != "" { // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize @@ -50,6 +58,14 @@ func InitMongoClient() *mongo.Client { panic(err.Error() + " mongo.Connect failed " + uri) } } + return mongoClient +} + +func GetCollection(mongoClient *mongo.Client) { + +} + +func CreateAllIndex(mongoClient *mongo.Client) { // mongodb create index if err := createMongoIndex(mongoClient, cSendLog, false, "send_id", "-send_time"); err != nil { panic(err.Error() + " index create failed " + cSendLog + " send_id, -send_time") @@ -72,7 +88,6 @@ func InitMongoClient() *mongo.Client { if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil { panic(err.Error() + "index create failed " + cTag + " tag_id") } - return mongoClient } func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error { diff --git a/pkg/common/db/mongo/mongo_model.go b/pkg/common/db/mongoDB/mongo_model.go similarity index 99% rename from pkg/common/db/mongo/mongo_model.go rename to pkg/common/db/mongoDB/mongo_model.go index 1f46b3156..3b6b7aa27 100644 --- a/pkg/common/db/mongo/mongo_model.go +++ b/pkg/common/db/mongoDB/mongo_model.go @@ -1,4 +1,4 @@ -package mongo +package mongoDB import ( "Open_IM/pkg/common/config" @@ -1091,8 +1091,7 @@ func (d *db.DataBases) GetUserFriendWorkMoments(showNumber, pageNumber int32, us } type SuperGroup struct { - GroupID string `bson:"group_id" json:"groupID"` - //MemberNumCount int `bson:"member_num_count"` + GroupID string `bson:"group_id" json:"groupID"` MemberIDList []string `bson:"member_id_list" json:"memberIDList"` } diff --git a/pkg/common/db/mongoDB/office.go b/pkg/common/db/mongoDB/office.go new file mode 100644 index 000000000..9b5669132 --- /dev/null +++ b/pkg/common/db/mongoDB/office.go @@ -0,0 +1 @@ +package mongoDB diff --git a/pkg/common/db/mongoDB/super_group.go b/pkg/common/db/mongoDB/super_group.go new file mode 100644 index 000000000..a28289b80 --- /dev/null +++ b/pkg/common/db/mongoDB/super_group.go @@ -0,0 +1,11 @@ +package mongoDB + +type SuperGroup struct { + GroupID string `bson:"group_id" json:"groupID"` + MemberIDList []string `bson:"member_id_list" json:"memberIDList"` +} + +type UserToSuperGroup struct { + UserID string `bson:"user_id" json:"userID"` + GroupIDList []string `bson:"group_id_list" json:"groupIDList"` +} diff --git a/pkg/common/db/mysql/group_model_k.go b/pkg/common/db/mysql/group_model_k.go index b6797109a..6b595136b 100644 --- a/pkg/common/db/mysql/group_model_k.go +++ b/pkg/common/db/mysql/group_model_k.go @@ -8,6 +8,14 @@ import ( "time" ) +type GroupModelInterface interface { + Create(ctx context.Context, groups []*Group) (err error) + Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error) + UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) + Update(ctx context.Context, groups []*Group) (err error) + Find(ctx context.Context, groupIDs []string) (groups []*Group, err error) + Take(ctx context.Context, groupID string) (group *Group, err error) +} type Group struct { GroupID string `gorm:"column:group_id;primary_key;size:64" json:"groupID" binding:"required"` GroupName string `gorm:"column:name;size:255" json:"groupName"` @@ -29,7 +37,8 @@ type Group struct { func NewGroupDB() *Group { var group Group - group.DB = initMysqlDB(&group) + db := ConnectToDB() + db = InitModel(db, &group) return &group } @@ -41,11 +50,11 @@ func (*Group) Create(ctx context.Context, groups []*Group) (err error) { return err } -func (*Group) Delete(ctx context.Context, groupIDs []string) (err error) { +func (g *Group) Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) }() - return utils.Wrap(GroupDB.Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "") + return utils.Wrap(getDBConn(g.DB, tx...).Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "") } func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) { diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go index 35bf3c541..f6980613d 100644 --- a/pkg/utils/strings.go +++ b/pkg/utils/strings.go @@ -122,7 +122,7 @@ func RemoveDuplicateElement(idList []string) []string { return result } -func IsRepeatStringSlice(arr []string) bool { +func IsDuplicateStringSlice(arr []string) bool { t := make(map[string]struct{}) for _, s := range arr { if _, ok := t[s]; ok { @@ -133,6 +133,6 @@ func IsRepeatStringSlice(arr []string) bool { return false } -func IsRepeatID(args ...interface{}) bool { +func IsDuplicateID(args ...interface{}) bool { return false }