diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index ae491734a..043398422 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/mongo" "Open_IM/pkg/common/log" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" @@ -89,7 +90,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs msgs, err := db.DB.GetUserMsgListByIndex(ID, index) if err != nil || msgs.UID == "" { if err != nil { - if err == db.ErrMsgListNotExist { + if err == mongo.ErrMsgListNotExist { log.NewInfo(operationID, utils.GetSelfFuncName(), "ID:", ID, "index:", index, err.Error()) } else { log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID) @@ -103,7 +104,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return delStruct.getSetMinSeq() + 1, nil } log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) - if len(msgs.Msg) > db.GetSingleGocMsgNum() { + if len(msgs.Msg) > mongo.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { @@ -149,7 +150,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return seq, utils.Wrap(err, "deleteMongoMsg failed") } -func msgListIsFull(chat *db.UserChat) bool { +func msgListIsFull(chat *mongo.UserChat) bool { index, _ := strconv.Atoi(strings.Split(chat.UID, ":")[1]) if index == 0 { if len(chat.Msg) >= 4999 { diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 88eac2df9..c0dda262d 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -2,7 +2,7 @@ package cronTask import ( "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" + mongo2 "Open_IM/pkg/common/db/mongo" server_api_params "Open_IM/pkg/proto/sdk_ws" "context" "fmt" @@ -22,8 +22,8 @@ var ( mongoClient *mongo.Collection ) -func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat { - chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))} +func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *mongo2.UserChat { + chat := &mongo2.UserChat{UID: userID + strconv.Itoa(int(index))} for i := startSeq; i <= stopSeq; i++ { msg := server_api_params.MsgData{ SendID: "sendID1", @@ -45,7 +45,7 @@ func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.Use } bytes, _ := proto.Marshal(&msg) sendTime := 0 - chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) + chat.Msg = append(chat.Msg, mongo2.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) } return chat } @@ -54,7 +54,7 @@ func SetUserMaxSeq(userID string, seq int) error { return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err() } -func CreateChat(userChat *db.UserChat) error { +func CreateChat(userChat *mongo2.UserChat) error { _, err := mongoClient.InsertOne(context.Background(), userChat) return err } diff --git a/internal/msg_transfer/logic/modify_msg_handler.go b/internal/msg_transfer/logic/modify_msg_handler.go index 782c6fcc0..847bf8761 100644 --- a/internal/msg_transfer/logic/modify_msg_handler.go +++ b/internal/msg_transfer/logic/modify_msg_handler.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/mongo" kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/msg" @@ -70,14 +71,14 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg } if !notification.IsReact { // first time to modify - var reactionExtensionList = make(map[string]db.KeyValue) - extendMsg := db.ExtendMsg{ + var reactionExtensionList = make(map[string]mongo.KeyValue) + extendMsg := mongo.ExtendMsg{ ReactionExtensionList: reactionExtensionList, ClientMsgID: notification.ClientMsgID, MsgFirstModifyTime: notification.MsgFirstModifyTime, } for _, v := range notification.SuccessReactionExtensionList { - reactionExtensionList[v.TypeKey] = db.KeyValue{ + reactionExtensionList[v.TypeKey] = mongo.KeyValue{ TypeKey: v.TypeKey, Value: v.Value, LatestUpdateTime: v.LatestUpdateTime, diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index d4f3683fe..ce3938036 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/mongo" "Open_IM/pkg/common/db/mysql_model/im_mysql_model" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" @@ -296,7 +297,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR } } - var tagSendLogs db.TagSendLog + var tagSendLogs mongo.TagSendLog var wg sync.WaitGroup wg.Add(len(successUserIDList)) var lock sync.Mutex @@ -309,7 +310,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR return } lock.Lock() - tagSendLogs.UserList = append(tagSendLogs.UserList, db.TagUser{ + tagSendLogs.UserList = append(tagSendLogs.UserList, mongo.TagUser{ UserID: userID, UserName: userName, }) @@ -388,10 +389,10 @@ func (s *officeServer) GetUserTagByID(_ context.Context, req *pbOffice.GetUserTa func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.CreateOneWorkMomentReq) (resp *pbOffice.CreateOneWorkMomentResp, err error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp = &pbOffice.CreateOneWorkMomentResp{CommonResp: &pbOffice.CommonResp{}} - workMoment := db.WorkMoment{ - Comments: []*db.Comment{}, - LikeUserList: []*db.WorkMomentUser{}, - PermissionUserList: []*db.WorkMomentUser{}, + workMoment := mongo.WorkMoment{ + Comments: []*mongo.Comment{}, + LikeUserList: []*mongo.WorkMomentUser{}, + PermissionUserList: []*mongo.WorkMomentUser{}, } createUser, err := imdb.GetUserByUserID(req.WorkMoment.UserID) if err != nil { @@ -405,14 +406,14 @@ func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.Crea workMoment.UserName = createUser.Nickname workMoment.FaceURL = createUser.FaceURL workMoment.PermissionUserIDList = s.getPermissionUserIDList(req.OperationID, req.WorkMoment.PermissionGroupList, req.WorkMoment.PermissionUserList) - workMoment.PermissionUserList = []*db.WorkMomentUser{} + workMoment.PermissionUserList = []*mongo.WorkMomentUser{} for _, userID := range workMoment.PermissionUserIDList { userName, err := imdb.GetUserNameByUserID(userID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error()) continue } - workMoment.PermissionUserList = append(workMoment.PermissionUserList, &db.WorkMomentUser{ + workMoment.PermissionUserList = append(workMoment.PermissionUserList, &mongo.WorkMomentUser{ UserID: userID, UserName: userName, }) @@ -502,7 +503,7 @@ func (s *officeServer) DeleteOneWorkMoment(_ context.Context, req *pbOffice.Dele return resp, nil } -func isUserCanSeeWorkMoment(userID string, workMoment db.WorkMoment) bool { +func isUserCanSeeWorkMoment(userID string, workMoment mongo.WorkMoment) bool { if userID != workMoment.UserID { switch workMoment.Permission { case constant.WorkMomentPublic: @@ -569,7 +570,7 @@ func (s *officeServer) CommentOneWorkMoment(_ context.Context, req *pbOffice.Com return resp, nil } } - comment := &db.Comment{ + comment := &mongo.Comment{ UserID: req.UserID, UserName: commentUser.Nickname, ReplyUserID: req.ReplyUserID, @@ -643,7 +644,7 @@ func (s *officeServer) GetUserWorkMoments(_ context.Context, req *pbOffice.GetUs log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp = &pbOffice.GetUserWorkMomentsResp{CommonResp: &pbOffice.CommonResp{}, WorkMoments: []*pbOffice.WorkMoment{}} resp.Pagination = &pbCommon.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber} - var workMoments []db.WorkMoment + var workMoments []mongo.WorkMoment if req.UserID == req.OpUserID { workMoments, err = db.DB.GetUserSelfWorkMoments(req.UserID, req.Pagination.ShowNumber, req.Pagination.PageNumber) } else { diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/cache/redis.go similarity index 54% rename from pkg/common/db/RedisModel.go rename to pkg/common/db/cache/redis.go index 29e2772a0..649bf5a3d 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/cache/redis.go @@ -1,4 +1,4 @@ -package db +package cache import ( "Open_IM/pkg/common/config" @@ -42,135 +42,175 @@ const ( exTypeKeyLocker = "EX_LOCK:" ) -func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { +func InitRedis(ctx context.Context) go_redis.UniversalClient { + var rdb go_redis.UniversalClient + var err error + if config.Config.Redis.EnableCluster { + rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ + Addrs: config.Config.Redis.DBAddress, + Username: config.Config.Redis.DBUserName, + Password: config.Config.Redis.DBPassWord, // no password set + PoolSize: 50, + }) + _, err = rdb.Ping(ctx).Result() + if err != nil { + fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress) + panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord) + } + } else { + rdb = go_redis.NewClient(&go_redis.Options{ + Addr: config.Config.Redis.DBAddress[0], + Username: config.Config.Redis.DBUserName, + Password: config.Config.Redis.DBPassWord, // no password set + DB: 0, // use default DB + PoolSize: 100, // 连接池大小 + }) + _, err = rdb.Ping(ctx).Result() + if err != nil { + panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord) + } + } + return rdb +} + +func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient { + return &RedisClient{rdb: rdb} +} + +type RedisClient struct { + rdb go_redis.UniversalClient +} + +func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) { key := accountTempCode + account - n, err := d.RDB.Exists(context.Background(), key).Result() + n, err := r.rdb.Exists(context.Background(), key).Result() if n > 0 { return true, err } else { return false, err } } -func (d *DataBases) SetAccountCode(account string, code, ttl int) (err error) { + +func (r *RedisClient) SetAccountCode(account string, code, ttl int) (err error) { key := accountTempCode + account - return d.RDB.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err() + return r.rdb.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err() } -func (d *DataBases) GetAccountCode(account string) (string, error) { +func (r *RedisClient) GetAccountCode(account string) (string, error) { key := accountTempCode + account - return d.RDB.Get(context.Background(), key).Result() + return r.rdb.Get(context.Background(), key).Result() } //Perform seq auto-increment operation of user messages -func (d *DataBases) IncrUserSeq(uid string) (uint64, error) { +func (r *RedisClient) IncrUserSeq(uid string) (uint64, error) { key := userIncrSeq + uid - seq, err := d.RDB.Incr(context.Background(), key).Result() + seq, err := r.rdb.Incr(context.Background(), key).Result() return uint64(seq), err } //Get the largest Seq -func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) { +func (r *RedisClient) GetUserMaxSeq(uid string) (uint64, error) { key := userIncrSeq + uid - seq, err := d.RDB.Get(context.Background(), key).Result() + seq, err := r.rdb.Get(context.Background(), key).Result() return uint64(utils.StringToInt(seq)), err } //set the largest Seq -func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error { +func (r *RedisClient) SetUserMaxSeq(uid string, maxSeq uint64) error { key := userIncrSeq + uid - return d.RDB.Set(context.Background(), key, maxSeq, 0).Err() + return r.rdb.Set(context.Background(), key, maxSeq, 0).Err() } //Set the user's minimum seq -func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) { +func (r *RedisClient) SetUserMinSeq(uid string, minSeq uint32) (err error) { key := userMinSeq + uid - return d.RDB.Set(context.Background(), key, minSeq, 0).Err() + return r.rdb.Set(context.Background(), key, minSeq, 0).Err() } //Get the smallest Seq -func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) { +func (r *RedisClient) GetUserMinSeq(uid string) (uint64, error) { key := userMinSeq + uid - seq, err := d.RDB.Get(context.Background(), key).Result() + seq, err := r.rdb.Get(context.Background(), key).Result() return uint64(utils.StringToInt(seq)), err } -func (d *DataBases) SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error) { +func (r *RedisClient) SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error) { key := groupUserMinSeq + "g:" + groupID + "u:" + userID - return d.RDB.Set(context.Background(), key, minSeq, 0).Err() + return r.rdb.Set(context.Background(), key, minSeq, 0).Err() } -func (d *DataBases) GetGroupUserMinSeq(groupID, userID string) (uint64, error) { +func (r *RedisClient) GetGroupUserMinSeq(groupID, userID string) (uint64, error) { key := groupUserMinSeq + "g:" + groupID + "u:" + userID - seq, err := d.RDB.Get(context.Background(), key).Result() + seq, err := r.rdb.Get(context.Background(), key).Result() return uint64(utils.StringToInt(seq)), err } -func (d *DataBases) GetGroupMaxSeq(groupID string) (uint64, error) { +func (r *RedisClient) GetGroupMaxSeq(groupID string) (uint64, error) { key := groupMaxSeq + groupID - seq, err := d.RDB.Get(context.Background(), key).Result() + seq, err := r.rdb.Get(context.Background(), key).Result() return uint64(utils.StringToInt(seq)), err } -func (d *DataBases) IncrGroupMaxSeq(groupID string) (uint64, error) { +func (r *RedisClient) IncrGroupMaxSeq(groupID string) (uint64, error) { key := groupMaxSeq + groupID - seq, err := d.RDB.Incr(context.Background(), key).Result() + seq, err := r.rdb.Incr(context.Background(), key).Result() return uint64(seq), err } -func (d *DataBases) SetGroupMaxSeq(groupID string, maxSeq uint64) error { +func (r *RedisClient) SetGroupMaxSeq(groupID string, maxSeq uint64) error { key := groupMaxSeq + groupID - return d.RDB.Set(context.Background(), key, maxSeq, 0).Err() + return r.rdb.Set(context.Background(), key, maxSeq, 0).Err() } -func (d *DataBases) SetGroupMinSeq(groupID string, minSeq uint32) error { +func (r *RedisClient) SetGroupMinSeq(groupID string, minSeq uint32) error { key := groupMinSeq + groupID - return d.RDB.Set(context.Background(), key, minSeq, 0).Err() + return r.rdb.Set(context.Background(), key, minSeq, 0).Err() } //Store userid and platform class to redis -func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { +func (r *RedisClient) AddTokenFlag(userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) log2.NewDebug("", "add token key is ", key) - return d.RDB.HSet(context.Background(), key, token, flag).Err() + return r.rdb.HSet(context.Background(), key, token, flag).Err() } -func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) { +func (r *RedisClient) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) { key := uidPidToken + userID + ":" + platformID log2.NewDebug("", "get token key is ", key) - m, err := d.RDB.HGetAll(context.Background(), key).Result() + m, err := r.rdb.HGetAll(context.Background(), key).Result() mm := make(map[string]int) for k, v := range m { mm[k] = utils.StringToInt(v) } return mm, err } -func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error { +func (r *RedisClient) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) mm := make(map[string]interface{}) for k, v := range m { mm[k] = v } - return d.RDB.HSet(context.Background(), key, mm).Err() + return r.rdb.HSet(context.Background(), key, mm).Err() } -func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { +func (r *RedisClient) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return d.RDB.HDel(context.Background(), key, fields...).Err() + return r.rdb.HDel(context.Background(), key, fields...).Err() } -func (d *DataBases) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error { +func (r *RedisClient) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error { key := conversationReceiveMessageOpt + userID - return d.RDB.HSet(context.Background(), key, conversationID, opt).Err() + return r.rdb.HSet(context.Background(), key, conversationID, opt).Err() } -func (d *DataBases) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) { +func (r *RedisClient) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) { key := conversationReceiveMessageOpt + userID - result, err := d.RDB.HGet(context.Background(), key, conversationID).Result() + result, err := r.rdb.HGet(context.Background(), key, conversationID).Result() return utils.StringToInt(result), err } -func (d *DataBases) SetUserGlobalMsgRecvOpt(userID string, opt int32) error { +func (r *RedisClient) SetUserGlobalMsgRecvOpt(userID string, opt int32) error { key := conversationReceiveMessageOpt + userID - return d.RDB.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err() + return r.rdb.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err() } -func (d *DataBases) GetUserGlobalMsgRecvOpt(userID string) (int, error) { +func (r *RedisClient) GetUserGlobalMsgRecvOpt(userID string) (int, error) { key := conversationReceiveMessageOpt + userID - result, err := d.RDB.HGet(context.Background(), key, GlobalMsgRecvOpt).Result() + result, err := r.rdb.HGet(context.Background(), key, GlobalMsgRecvOpt).Result() if err != nil { if err == go_redis.Nil { return 0, nil @@ -180,11 +220,11 @@ func (d *DataBases) GetUserGlobalMsgRecvOpt(userID string) (int, error) { } return utils.StringToInt(result), err } -func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { +func (r *RedisClient) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := d.RDB.Get(context.Background(), key).Result() + result, err := r.rdb.Get(context.Background(), key).Result() if err != nil { errResult = err failedSeqList = append(failedSeqList, v) @@ -206,9 +246,9 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati return seqMsg, failedSeqList, errResult } -func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) { +func (r *RedisClient) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) { ctx := context.Background() - pipe := d.RDB.Pipeline() + pipe := r.rdb.Pipeline() var failedList []pbChat.MsgDataToMQ for _, msg := range msgList { key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) @@ -219,7 +259,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, } log2.NewDebug(operationID, "convert string is ", s) err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - //err = d.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() + //err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() if err != nil { log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error()) failedList = append(failedList, *msg) @@ -231,11 +271,11 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, _, err := pipe.Exec(ctx) return err, 0 } -func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { +func (r *RedisClient) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { ctx := context.Background() for _, msg := range msgList { key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) - err := d.RDB.Del(ctx, key).Err() + err := r.rdb.Del(ctx, key).Err() if err != nil { log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, uid, err.Error(), msgList) } @@ -243,10 +283,10 @@ func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid st return nil } -func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error { +func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error { ctx := context.Background() key := messageCache + userID + "_" + "*" - vals, err := d.RDB.Keys(ctx, key).Result() + vals, err := r.rdb.Keys(ctx, key).Result() log2.Debug(operationID, "vals: ", vals) if err == go_redis.Nil { return nil @@ -255,12 +295,12 @@ func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID str return utils.Wrap(err, "") } for _, v := range vals { - err = d.RDB.Del(ctx, v).Err() + err = r.rdb.Del(ctx, v).Err() } return nil } -func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) { +func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) { req := &pbRtc.SignalReq{} if err := proto.Unmarshal(msg.Content, req); err != nil { return false, err @@ -293,16 +333,16 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, return false, err } keyList := SignalListCache + userID - err = d.RDB.LPush(context.Background(), keyList, msg.ClientMsgID).Err() + err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err() if err != nil { return false, err } - err = d.RDB.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() + err = r.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() if err != nil { return false, err } key := SignalCache + msg.ClientMsgID - err = d.RDB.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() + err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() if err != nil { return false, err } @@ -311,10 +351,10 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, return true, nil } -func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { +func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { key := SignalCache + clientMsgID invitationInfo = &pbRtc.SignalInviteReq{} - bytes, err := d.RDB.Get(context.Background(), key).Bytes() + bytes, err := r.rdb.Get(context.Background(), key).Bytes() if err != nil { return nil, err } @@ -333,9 +373,9 @@ func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (inv return invitationInfo, err } -func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { +func (r *RedisClient) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { keyList := SignalListCache + userID - result := d.RDB.LPop(context.Background(), keyList) + result := r.rdb.LPop(context.Background(), keyList) if err = result.Err(); err != nil { return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed") } @@ -344,27 +384,27 @@ func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationI return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed") } log2.NewDebug("", utils.GetSelfFuncName(), result, result.String()) - invitationInfo, err = d.GetSignalInfoFromCacheByClientMsgID(key) + invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(key) if err != nil { return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID") } - err = d.DelUserSignalList(userID) + err = r.DelUserSignalList(userID) if err != nil { return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID") } return invitationInfo, nil } -func (d *DataBases) DelUserSignalList(userID string) error { +func (r *RedisClient) DelUserSignalList(userID string) error { keyList := SignalListCache + userID - err := d.RDB.Del(context.Background(), keyList).Err() + err := r.rdb.Del(context.Background(), keyList).Err() return err } -func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { +func (r *RedisClient) DelMsgFromCache(uid string, seqList []uint32, operationID string) { for _, seq := range seqList { key := messageCache + uid + "_" + strconv.Itoa(int(seq)) - result, err := d.RDB.Get(context.Background(), key).Result() + result, err := r.rdb.Get(context.Background(), key).Result() if err != nil { if err == go_redis.Nil { log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil") @@ -384,36 +424,36 @@ func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID st log2.Error(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg, err.Error()) continue } - if err := d.RDB.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := r.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { log2.Error(operationID, utils.GetSelfFuncName(), "Set failed", err.Error()) } } } -func (d *DataBases) SetGetuiToken(token string, expireTime int64) error { - return d.RDB.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err() +func (r *RedisClient) SetGetuiToken(token string, expireTime int64) error { + return r.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err() } -func (d *DataBases) GetGetuiToken() (string, error) { - result, err := d.RDB.Get(context.Background(), getuiToken).Result() +func (r *RedisClient) GetGetuiToken() (string, error) { + result, err := r.rdb.Get(context.Background(), getuiToken).Result() return result, err } -func (d *DataBases) SetGetuiTaskID(taskID string, expireTime int64) error { - return d.RDB.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err() +func (r *RedisClient) SetGetuiTaskID(taskID string, expireTime int64) error { + return r.rdb.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err() } -func (d *DataBases) GetGetuiTaskID() (string, error) { - result, err := d.RDB.Get(context.Background(), getuiTaskID).Result() +func (r *RedisClient) GetGetuiTaskID() (string, error) { + result, err := r.rdb.Get(context.Background(), getuiTaskID).Result() return result, err } -func (d *DataBases) SetSendMsgStatus(status int32, operationID string) error { - return d.RDB.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err() +func (r *RedisClient) SetSendMsgStatus(status int32, operationID string) error { + return r.rdb.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err() } -func (d *DataBases) GetSendMsgStatus(operationID string) (int, error) { - result, err := d.RDB.Get(context.Background(), sendMsgFailedFlag+operationID).Result() +func (r *RedisClient) GetSendMsgStatus(operationID string) (int, error) { + result, err := r.rdb.Get(context.Background(), sendMsgFailedFlag+operationID).Result() if err != nil { return 0, err } @@ -421,36 +461,36 @@ func (d *DataBases) GetSendMsgStatus(operationID string) (int, error) { return status, err } -func (d *DataBases) SetFcmToken(account string, platformID int, fcmToken string, expireTime int64) (err error) { +func (r *RedisClient) SetFcmToken(account string, platformID int, fcmToken string, expireTime int64) (err error) { key := FcmToken + account + ":" + strconv.Itoa(platformID) - return d.RDB.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err() + return r.rdb.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err() } -func (d *DataBases) GetFcmToken(account string, platformID int) (string, error) { +func (r *RedisClient) GetFcmToken(account string, platformID int) (string, error) { key := FcmToken + account + ":" + strconv.Itoa(platformID) - return d.RDB.Get(context.Background(), key).Result() + return r.rdb.Get(context.Background(), key).Result() } -func (d *DataBases) DelFcmToken(account string, platformID int) error { +func (r *RedisClient) DelFcmToken(account string, platformID int) error { key := FcmToken + account + ":" + strconv.Itoa(platformID) - return d.RDB.Del(context.Background(), key).Err() + return r.rdb.Del(context.Background(), key).Err() } -func (d *DataBases) IncrUserBadgeUnreadCountSum(uid string) (int, error) { +func (r *RedisClient) IncrUserBadgeUnreadCountSum(uid string) (int, error) { key := userBadgeUnreadCountSum + uid - seq, err := d.RDB.Incr(context.Background(), key).Result() + seq, err := r.rdb.Incr(context.Background(), key).Result() return int(seq), err } -func (d *DataBases) SetUserBadgeUnreadCountSum(uid string, value int) error { +func (r *RedisClient) SetUserBadgeUnreadCountSum(uid string, value int) error { key := userBadgeUnreadCountSum + uid - return d.RDB.Set(context.Background(), key, value, 0).Err() + return r.rdb.Set(context.Background(), key, value, 0).Err() } -func (d *DataBases) GetUserBadgeUnreadCountSum(uid string) (int, error) { +func (r *RedisClient) GetUserBadgeUnreadCountSum(uid string) (int, error) { key := userBadgeUnreadCountSum + uid - seq, err := d.RDB.Get(context.Background(), key).Result() + seq, err := r.rdb.Get(context.Background(), key).Result() return utils.StringToInt(seq), err } -func (d *DataBases) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) { +func (r *RedisClient) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) { key := getMessageReactionExPrefix(clientMsgID, sessionType) - n, err := d.RDB.Exists(context.Background(), key).Result() + n, err := r.rdb.Exists(context.Background(), key).Result() if n > 0 { return true, err } else { @@ -458,38 +498,38 @@ func (d *DataBases) JudgeMessageReactionEXISTS(clientMsgID string, sessionType i } } -func (d *DataBases) GetOneMessageAllReactionList(clientMsgID string, sessionType int32) (map[string]string, error) { +func (r *RedisClient) GetOneMessageAllReactionList(clientMsgID string, sessionType int32) (map[string]string, error) { key := getMessageReactionExPrefix(clientMsgID, sessionType) - return d.RDB.HGetAll(context.Background(), key).Result() + return r.rdb.HGetAll(context.Background(), key).Result() } -func (d *DataBases) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error { +func (r *RedisClient) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error { key := getMessageReactionExPrefix(clientMsgID, sessionType) - return d.RDB.HDel(context.Background(), key, subKey).Err() + return r.rdb.HDel(context.Background(), key, subKey).Err() } -func (d *DataBases) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { +func (r *RedisClient) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { key := getMessageReactionExPrefix(clientMsgID, sessionType) - return d.RDB.Expire(context.Background(), key, expiration).Result() + return r.rdb.Expire(context.Background(), key, expiration).Result() } -func (d *DataBases) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) { +func (r *RedisClient) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) { key := getMessageReactionExPrefix(clientMsgID, sessionType) - result, err := d.RDB.HGet(context.Background(), key, typeKey).Result() + result, err := r.rdb.HGet(context.Background(), key, typeKey).Result() return result, err } -func (d *DataBases) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error { +func (r *RedisClient) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error { key := getMessageReactionExPrefix(clientMsgID, sessionType) - return d.RDB.HSet(context.Background(), key, typeKey, value).Err() + return r.rdb.HSet(context.Background(), key, typeKey, value).Err() } -func (d *DataBases) LockMessageTypeKey(clientMsgID string, TypeKey string) error { +func (r *RedisClient) LockMessageTypeKey(clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey - return d.RDB.SetNX(context.Background(), key, 1, time.Minute).Err() + return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err() } -func (d *DataBases) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error { +func (r *RedisClient) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey - return d.RDB.Del(context.Background(), key).Err() + return r.rdb.Del(context.Background(), key).Err() } diff --git a/pkg/common/db/redisModel_test.go b/pkg/common/db/cache/redisModel_test.go similarity index 99% rename from pkg/common/db/redisModel_test.go rename to pkg/common/db/cache/redisModel_test.go index 7c1cf9f60..d748a25df 100644 --- a/pkg/common/db/redisModel_test.go +++ b/pkg/common/db/cache/redisModel_test.go @@ -1,4 +1,4 @@ -package db +package cache import ( "Open_IM/pkg/common/constant" diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/cache/rockscache.go similarity index 88% rename from pkg/common/db/rocks_cache/rocks_cache.go rename to pkg/common/db/cache/rockscache.go index 5d8a4d812..2e5babf1b 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/cache/rockscache.go @@ -1,15 +1,16 @@ -package rocksCache +package cache import ( "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "Open_IM/pkg/common/db/mongo" + "Open_IM/pkg/common/db/mysql" "Open_IM/pkg/common/log" "Open_IM/pkg/common/trace_log" "Open_IM/pkg/utils" "context" "encoding/json" - "fmt" + "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" "math/big" "sort" "strconv" @@ -36,8 +37,19 @@ const ( extendMsgCache = "EXTEND_MSG_CACHE:" ) -func DelKeys() { - fmt.Println("cache init to del old keys") +const scanCount = 3000 + +type RcClient struct { + rdb redis.UniversalClient + Cache *rockscache.Client + ExpireTime time.Duration +} + +func NewRcClient(rdb redis.UniversalClient, expireTime time.Duration, opts rockscache.Options) *RcClient { + return &RcClient{Cache: rockscache.NewClient(rdb, opts), ExpireTime: expireTime} +} + +func (rc *RcClient) DelKeys() { for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache, groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} { fName := utils.GetSelfFuncName() @@ -46,16 +58,16 @@ func DelKeys() { for { var keys []string var err error - keys, cursor, err = db.DB.RDB.Scan(context.Background(), cursor, key+"*", 3000).Result() + keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() if err != nil { panic(err.Error()) } n += len(keys) // for each for redis cluster for _, key := range keys { - if err = db.DB.RDB.Del(context.Background(), key).Err(); err != nil { + if err = rc.rdb.Del(context.Background(), key).Err(); err != nil { log.NewError("", fName, key, err.Error()) - err = db.DB.RDB.Del(context.Background(), key).Err() + err = rc.rdb.Del(context.Background(), key).Err() if err != nil { panic(err.Error()) } @@ -68,9 +80,9 @@ func DelKeys() { } } -func GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { +func (rc *RcClient) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { getFriendIDList := func() (string, error) { - friendIDList, err := imdb.GetFriendIDListByUserID(userID) + friendIDList, err := mysql.GetFriendIDListByUserID(userID) if err != nil { return "", utils.Wrap(err, "") } @@ -100,7 +112,7 @@ func DelFriendIDListFromCache(ctx context.Context, userID string) (err error) { func GetBlackListFromCache(ctx context.Context, userID string) (blackIDs []string, err error) { getBlackIDList := func() (string, error) { - blackIDs, err := imdb.GetBlackIDListByUserID(userID) + blackIDs, err := mysql.GetBlackIDListByUserID(userID) if err != nil { return "", utils.Wrap(err, "") } @@ -130,7 +142,7 @@ func DelBlackIDListFromCache(ctx context.Context, userID string) (err error) { func GetJoinedGroupIDListFromCache(ctx context.Context, userID string) (joinedGroupList []string, err error) { getJoinedGroupIDList := func() (string, error) { - joinedGroupList, err := imdb.GetJoinedGroupIDListByUserID(userID) + joinedGroupList, err := mysql.GetJoinedGroupIDListByUserID(userID) if err != nil { return "", utils.Wrap(err, "") } @@ -172,7 +184,7 @@ func GetGroupMemberIDListFromCache(ctx context.Context, groupID string) (groupMe } groupMemberIDList = superGroup.MemberIDList } else { - groupMemberIDList, err = imdb.GetGroupMemberIDListByGroupID(groupID) + groupMemberIDList, err = mysql.GetGroupMemberIDListByGroupID(groupID) if err != nil { return "", utils.Wrap(err, "") } @@ -201,9 +213,9 @@ func DelGroupMemberIDListFromCache(ctx context.Context, groupID string) (err err return db.DB.Rc.TagAsDeleted(groupCache + groupID) } -func GetUserInfoFromCache(ctx context.Context, userID string) (userInfo *imdb.User, err error) { +func GetUserInfoFromCache(ctx context.Context, userID string) (userInfo *mysql.User, err error) { getUserInfo := func() (string, error) { - userInfo, err := imdb.GetUserByUserID(userID) + userInfo, err := mysql.GetUserByUserID(userID) if err != nil { return "", utils.Wrap(err, "") } @@ -220,13 +232,13 @@ func GetUserInfoFromCache(ctx context.Context, userID string) (userInfo *imdb.Us if err != nil { return nil, utils.Wrap(err, "") } - userInfo = &imdb.User{} + userInfo = &mysql.User{} err = json.Unmarshal([]byte(userInfoStr), userInfo) return userInfo, utils.Wrap(err, "") } -func GetUserInfoFromCacheBatch(ctx context.Context, userIDs []string) ([]*imdb.User, error) { - var users []*imdb.User +func GetUserInfoFromCacheBatch(ctx context.Context, userIDs []string) ([]*mysql.User, error) { + var users []*mysql.User for _, userID := range userIDs { user, err := GetUserInfoFromCache(ctx, userID) if err != nil { @@ -244,9 +256,9 @@ func DelUserInfoFromCache(ctx context.Context, userID string) (err error) { return db.DB.Rc.TagAsDeleted(userInfoCache + userID) } -func GetGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (groupMember *imdb.GroupMember, err error) { +func GetGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (groupMember *mysql.GroupMember, err error) { getGroupMemberInfo := func() (string, error) { - groupMemberInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID) + groupMemberInfo, err := mysql.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID) if err != nil { return "", utils.Wrap(err, "") } @@ -263,7 +275,7 @@ func GetGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (g if err != nil { return nil, utils.Wrap(err, "") } - groupMember = &imdb.GroupMember{} + groupMember = &mysql.GroupMember{} err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember) return groupMember, utils.Wrap(err, "") } @@ -275,7 +287,7 @@ func DelGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (e return db.DB.Rc.TagAsDeleted(groupMemberInfoCache + groupID + "-" + userID) } -func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, groupID string) (groupMembers []*imdb.GroupMember, err error) { +func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, groupID string) (groupMembers []*mysql.GroupMember, err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers) }() @@ -286,7 +298,7 @@ func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, grou if count < 0 || offset < 0 { return nil, nil } - var groupMemberList []*imdb.GroupMember + var groupMemberList []*mysql.GroupMember var start, stop int32 start = offset stop = offset + count @@ -322,12 +334,12 @@ func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, grou return groupMemberList, nil } -func GetAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (groupMembers []*imdb.GroupMember, err error) { +func GetAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (groupMembers []*mysql.GroupMember, err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMembers", groupMembers) }() getGroupMemberInfo := func() (string, error) { - groupMembers, err := imdb.GetGroupMemberListByGroupID(groupID) + groupMembers, err := mysql.GetGroupMemberListByGroupID(groupID) if err != nil { return "", utils.Wrap(err, "") } @@ -352,9 +364,9 @@ func DelAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (err e return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID) } -func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *imdb.Group, err error) { +func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { getGroupInfo := func() (string, error) { - groupInfo, err := imdb.GetGroupInfoByGroupID(groupID) + groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) if err != nil { return "", utils.Wrap(err, "") } @@ -364,7 +376,7 @@ func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *imdb } return string(bytes), nil } - groupInfo = &imdb.Group{} + groupInfo = &mysql.Group{} defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) }() @@ -383,9 +395,9 @@ func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) { return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID) } -func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*imdb.Friend, err error) { +func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) { getAllFriendInfo := func() (string, error) { - friendInfoList, err := imdb.GetFriendListByUserID(userID) + friendInfoList, err := mysql.GetFriendListByUserID(userID) if err != nil { return "", utils.Wrap(err, "") } @@ -485,7 +497,7 @@ func DelGroupMemberListHashFromCache(ctx context.Context, groupID string) (err e func GetGroupMemberNumFromCache(ctx context.Context, groupID string) (num int, err error) { getGroupMemberNum := func() (string, error) { - num, err := imdb.GetGroupMemberNumByGroupID(groupID) + num, err := mysql.GetGroupMemberNumByGroupID(groupID) if err != nil { return "", utils.Wrap(err, "") } @@ -510,7 +522,7 @@ func DelGroupMemberNumFromCache(ctx context.Context, groupID string) (err error) func GetUserConversationIDListFromCache(ctx context.Context, userID string) (conversationIDs []string, err error) { getConversationIDList := func() (string, error) { - conversationIDList, err := imdb.GetConversationIDListByUserID(userID) + conversationIDList, err := mysql.GetConversationIDListByUserID(userID) if err != nil { return "", utils.Wrap(err, "getConversationIDList failed") } @@ -539,9 +551,9 @@ func DelUserConversationIDListFromCache(ctx context.Context, userID string) (err return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationIDListCache+userID), "DelUserConversationIDListFromCache err") } -func GetConversationFromCache(ctx context.Context, ownerUserID, conversationID string) (conversation *imdb.Conversation, err error) { +func GetConversationFromCache(ctx context.Context, ownerUserID, conversationID string) (conversation *mysql.Conversation, err error) { getConversation := func() (string, error) { - conversation, err := imdb.GetConversation(ownerUserID, conversationID) + conversation, err := mysql.GetConversation(ownerUserID, conversationID) if err != nil { return "", utils.Wrap(err, "get failed") } @@ -558,12 +570,12 @@ func GetConversationFromCache(ctx context.Context, ownerUserID, conversationID s if err != nil { return nil, utils.Wrap(err, "Fetch failed") } - conversation = &imdb.Conversation{} + conversation = &mysql.Conversation{} err = json.Unmarshal([]byte(conversationStr), &conversation) return conversation, utils.Wrap(err, "Unmarshal failed") } -func GetConversationsFromCache(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []imdb.Conversation, err error) { +func GetConversationsFromCache(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []mysql.Conversation, err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) }() @@ -577,7 +589,7 @@ func GetConversationsFromCache(ctx context.Context, ownerUserID string, conversa return conversations, nil } -func GetUserAllConversationList(ctx context.Context, ownerUserID string) (conversations []imdb.Conversation, err error) { +func GetUserAllConversationList(ctx context.Context, ownerUserID string) (conversations []mysql.Conversation, err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) }() @@ -585,7 +597,7 @@ func GetUserAllConversationList(ctx context.Context, ownerUserID string) (conver if err != nil { return nil, err } - var conversationList []imdb.Conversation + var conversationList []mysql.Conversation log.NewDebug("", utils.GetSelfFuncName(), IDList) for _, conversationID := range IDList { conversation, err := GetConversationFromCache(ctx, ownerUserID, conversationID) @@ -604,7 +616,7 @@ func DelConversationFromCache(ctx context.Context, ownerUserID, conversationID s return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") } -func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *db.ExtendMsg, err error) { +func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongo.ExtendMsg, err error) { getExtendMsg := func() (string, error) { extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime) if err != nil { @@ -624,7 +636,7 @@ func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clien if err != nil { return nil, utils.Wrap(err, "Fetch failed") } - extendMsg = &db.ExtendMsg{} + extendMsg = &mongo.ExtendMsg{} err = json.Unmarshal([]byte(extendMsgStr), extendMsg) return extendMsg, utils.Wrap(err, "Unmarshal failed") } diff --git a/pkg/common/db/model/group.go b/pkg/common/db/model/group.go new file mode 100644 index 000000000..7547940c6 --- /dev/null +++ b/pkg/common/db/model/group.go @@ -0,0 +1,111 @@ +package model + +import ( + "Open_IM/pkg/common/db/cache" + "Open_IM/pkg/common/db/mysql" + "Open_IM/pkg/common/trace_log" + "Open_IM/pkg/utils" + "context" + "encoding/json" + "github.com/dtm-labs/rockscache" + "gorm.io/gorm" + "time" +) + +type GroupModel struct { + strongRc *cache.RcClient + weakRc *cache.RcClient + db *mysql.Group + rdb *cache.RedisClient +} + +const GroupExpireTime = time.Second * 300 * 60 +const RandomExpireAdjustment = 0.2 + +//cache key +const groupInfoCache = "GROUP_INFO_CACHE:" + +func NewGroupModel(ctx context.Context) { + var groupModel GroupModel + redisClient := cache.InitRedis(ctx) + rdb := cache.NewRedisClient(redisClient) + groupModel.rdb = rdb + groupModel.db = mysql.NewGroupDB() + groupModel.strongRc = cache.NewRcClient(redisClient, GroupExpireTime, rockscache.Options{ + RandomExpireAdjustment: RandomExpireAdjustment, + DisableCacheRead: false, + DisableCacheDelete: false, + StrongConsistency: true, + }) + groupModel.weakRc = cache.NewRcClient(redisClient, GroupExpireTime, rockscache.Options{ + RandomExpireAdjustment: RandomExpireAdjustment, + DisableCacheRead: false, + DisableCacheDelete: false, + StrongConsistency: false, + }) +} + +func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { + for _, groupID := range groupIDs { + group, err := g.getGroupInfoFromCache(ctx, groupID) + if err != nil { + return nil, err + } + groups = append(groups, group) + } + return groups, nil +} + +func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error { + return g.db.Create(ctx, groups) +} + +func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { + tx := g.db.DB.Begin() + if err := g.db.Delete(ctx, groupIDs); err != nil { + tx.Commit() + return err + } + if err := g.deleteGroupsInCache(ctx, groupIDs); err != nil { + tx.Rollback() + return err + } + return nil +} + +func (g *GroupModel) getGroupCacheKey(groupID string) string { + return groupInfoCache + groupID +} + +func (g *GroupModel) deleteGroupsInCache(ctx context.Context, groupIDs []string) error { + for _, groupID := range groupIDs { + if err := g.weakRc.Cache.TagAsDeleted(g.getGroupCacheKey(groupID)); err != nil { + return err + } + } + return nil +} + +func (g *GroupModel) getGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { + getGroupInfo := func() (string, error) { + groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) + if err != nil { + return "", utils.Wrap(err, "") + } + bytes, err := json.Marshal(groupInfo) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + groupInfo = &mysql.Group{} + defer func() { + trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) + }() + groupInfoStr, err := g.weakRc.Cache.Fetch(groupInfoCache+groupID, GroupExpireTime, getGroupInfo) + if err != nil { + return nil, utils.Wrap(err, "") + } + err = json.Unmarshal([]byte(groupInfoStr), groupInfo) + return groupInfo, utils.Wrap(err, "") +} diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/mongo/batch_insert_chat.go similarity index 88% rename from pkg/common/db/batch_insert_chat.go rename to pkg/common/db/mongo/batch_insert_chat.go index c9174ad4d..e81e3e800 100644 --- a/pkg/common/db/batch_insert_chat.go +++ b/pkg/common/db/mongo/batch_insert_chat.go @@ -1,8 +1,9 @@ -package db +package mongo import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" pbMsg "Open_IM/pkg/proto/msg" @@ -15,7 +16,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) -func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error { +func (d *db.DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error { newTime := getCurrentTimestampByMill() if len(msgList) > GetSingleGocMsgNum() { return errors.New("too large") @@ -24,17 +25,17 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo var remain uint64 blk0 := uint64(GetSingleGocMsgNum() - 1) //currentMaxSeq 4998 - if currentMaxSeq < uint64(GetSingleGocMsgNum()) { + if currentMaxSeq < uint64(mongo2.GetSingleGocMsgNum()) { remain = blk0 - currentMaxSeq //1 } else { excludeBlk0 := currentMaxSeq - blk0 //=1 //(5000-1)%5000 == 4999 - remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum()) + remain = (uint64(mongo2.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(mongo2.GetSingleGocMsgNum()))) % uint64(mongo2.GetSingleGocMsgNum()) } //remain=1 insertCounter := uint64(0) - msgListToMongo := make([]MsgInfo, 0) - msgListToMongoNext := make([]MsgInfo, 0) + msgListToMongo := make([]mongo2.MsgInfo, 0) + msgListToMongoNext := make([]mongo2.MsgInfo, 0) seqUid := "" seqUidNext := "" log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList)) @@ -42,7 +43,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo for _, m := range msgList { log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) currentMaxSeq++ - sMsg := MsgInfo{} + sMsg := mongo2.MsgInfo{} sMsg.SendTime = m.MsgData.SendTime m.MsgData.Seq = uint32(currentMaxSeq) log.Debug(operationID, "mongo msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", userID, "seq: ", currentMaxSeq) @@ -51,24 +52,24 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo } if isInit { msgListToMongoNext = append(msgListToMongoNext, sMsg) - seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) + seqUidNext = mongo2.getSeqUid(userID, uint32(currentMaxSeq)) log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) continue } if insertCounter < remain { msgListToMongo = append(msgListToMongo, sMsg) insertCounter++ - seqUid = getSeqUid(userID, uint32(currentMaxSeq)) + seqUid = mongo2.getSeqUid(userID, uint32(currentMaxSeq)) log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID) } else { msgListToMongoNext = append(msgListToMongoNext, sMsg) - seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) + seqUidNext = mongo2.getSeqUid(userID, uint32(currentMaxSeq)) log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID) } } ctx := context.Background() - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(mongo2.cChat) if seqUid != "" { filter := bson.M{"uid": seqUid} @@ -77,7 +78,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo if err != nil { if err == mongo.ErrNoDocuments { filter := bson.M{"uid": seqUid} - sChat := UserChat{} + sChat := mongo2.UserChat{} sChat.UID = seqUid sChat.Msg = msgListToMongo log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo) @@ -98,7 +99,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo } if seqUidNext != "" { filter := bson.M{"uid": seqUidNext} - sChat := UserChat{} + sChat := mongo2.UserChat{} sChat.UID = seqUidNext sChat.Msg = msgListToMongoNext log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) @@ -109,14 +110,14 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo } promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter) } - log.Debug(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList)) + log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList)) return nil } -func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) { - newTime := getCurrentTimestampByMill() +func (d *db.DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) { + newTime := mongo2.getCurrentTimestampByMill() lenList := len(msgList) - if lenList > GetSingleGocMsgNum() { + if lenList > mongo2.GetSingleGocMsgNum() { return errors.New("too large"), 0 } if lenList < 1 { @@ -142,7 +143,7 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD for _, m := range msgList { currentMaxSeq++ - sMsg := MsgInfo{} + sMsg := mongo2.MsgInfo{} sMsg.SendTime = m.MsgData.SendTime m.MsgData.Seq = uint32(currentMaxSeq) log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", insertID, "seq: ", currentMaxSeq) @@ -155,7 +156,7 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD } else { promePkg.PromeInc(promePkg.MsgInsertRedisSuccessCounter) } - log.Debug(operationID, "batch to redis cost time ", getCurrentTimestampByMill()-newTime, insertID, len(msgList)) + log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, insertID, len(msgList)) if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { err = d.SetGroupMaxSeq(insertID, currentMaxSeq) } else { diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/mongo/extend_msg_mongo_model.go similarity index 87% rename from pkg/common/db/extend_msg_mongo_model.go rename to pkg/common/db/mongo/extend_msg_mongo_model.go index 07d853418..582c43f96 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/mongo/extend_msg_mongo_model.go @@ -1,7 +1,8 @@ -package db +package mongo import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/db" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" @@ -57,7 +58,7 @@ func SplitSourceIDAndGetIndex(sourceID string) int32 { return int32(index) } -func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { +func (d *db.DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) _, err := c.InsertOne(ctx, set) @@ -68,7 +69,7 @@ type GetAllExtendMsgSetOpts struct { ExcludeExtendMsgs bool } -func (d *DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) { +func (d *db.DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) regex := fmt.Sprintf("^%s", ID) @@ -90,7 +91,7 @@ func (d *DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) return sets, nil } -func (d *DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64, c *mongo.Collection) (*ExtendMsgSet, error) { +func (d *db.DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64, c *mongo.Collection) (*ExtendMsgSet, error) { regex := fmt.Sprintf("^%s", sourceID) var err error findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0}) @@ -114,7 +115,7 @@ func (d *DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessio } // first modify msg -func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error { +func (d *db.DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, 0, c) @@ -141,7 +142,7 @@ func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *Ext } // insert or update -func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { +func (d *db.DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) var updateBson = bson.M{} @@ -164,7 +165,7 @@ func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionT } // delete TypeKey -func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { +func (d *db.DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) var updateBson = bson.M{} @@ -182,7 +183,7 @@ func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int3 return err } -func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) { +func (d *db.DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1}) diff --git a/pkg/common/db/model.go b/pkg/common/db/mongo/init_mongo.go similarity index 56% rename from pkg/common/db/model.go rename to pkg/common/db/mongo/init_mongo.go index bf2058e51..b61991506 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/mongo/init_mongo.go @@ -1,58 +1,19 @@ -package db +package mongo import ( "Open_IM/pkg/common/config" - "github.com/dtm-labs/rockscache" - "go.mongodb.org/mongo-driver/x/bsonx" - "strings" - - //"Open_IM/pkg/common/log" "Open_IM/pkg/utils" - "fmt" - go_redis "github.com/go-redis/redis/v8" - "go.mongodb.org/mongo-driver/mongo/options" - - "gopkg.in/mgo.v2" - "time" - "context" - //"go.mongodb.org/mongo-driver/bson" + "fmt" "go.mongodb.org/mongo-driver/mongo" - // "go.mongodb.org/mongo-driver/mongo/options" - //go_redis "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx" + "log" + "strings" + "time" ) -var DB DataBases - -type DataBases struct { - MysqlDB mysqlDB - mgoSession *mgo.Session - //redisPool *redis.Pool - mongoClient *mongo.Client - RDB go_redis.UniversalClient - Rc *rockscache.Client - WeakRc *rockscache.Client -} - -type RedisClient struct { - client *go_redis.Client - cluster *go_redis.ClusterClient - go_redis.UniversalClient - enableCluster bool -} - -func key(dbAddress, dbName string) string { - return dbAddress + "_" + dbName -} - -func init() { - var mongoClient *mongo.Client - var err1 error - fmt.Println("init mysql redis mongo ") - - initMysqlDB() - // mongo init - // "mongodb://sysop:moon@localhost/records" +func InitMongoClient() *mongo.Client { uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" if config.Config.Mongo.DBUri != "" { // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize @@ -67,7 +28,6 @@ func init() { mongodbHosts += v + "," } } - if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" { // clientOpts := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018/?replicaSet=replset") //mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]] @@ -81,13 +41,13 @@ func init() { config.Config.Mongo.DBMaxPoolSize) } } - + log.Println("start to init mongoDB:", uri) mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) if err != nil { time.Sleep(time.Duration(30) * time.Second) - mongoClient, err1 = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) - if err1 != nil { - panic(err1.Error() + " mongo.Connect failed " + uri) + mongoClient, err = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) + if err != nil { + panic(err.Error() + " mongo.Connect failed " + uri) } } // mongodb create index @@ -95,7 +55,7 @@ func init() { panic(err.Error() + " index create failed " + cSendLog + " send_id, -send_time") } if err := createMongoIndex(mongoClient, cChat, false, "uid"); err != nil { - fmt.Println(err.Error() + " index create failed " + cChat + " uid ") + fmt.Println(err.Error() + " index create failed " + cChat + " uid, please create index by yourself in field uid") } if err := createMongoIndex(mongoClient, cWorkMoment, true, "-create_time", "work_moment_id"); err != nil { panic(err.Error() + "index create failed " + cWorkMoment + " -create_time, work_moment_id") @@ -112,44 +72,7 @@ func init() { if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil { panic(err.Error() + "index create failed " + cTag + " tag_id") } - DB.mongoClient = mongoClient - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if config.Config.Redis.EnableCluster { - DB.RDB = go_redis.NewClusterClient(&go_redis.ClusterOptions{ - Addrs: config.Config.Redis.DBAddress, - Username: config.Config.Redis.DBUserName, - Password: config.Config.Redis.DBPassWord, // no password set - PoolSize: 50, - }) - _, err = DB.RDB.Ping(ctx).Result() - if err != nil { - fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress) - panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord) - } - } else { - DB.RDB = go_redis.NewClient(&go_redis.Options{ - Addr: config.Config.Redis.DBAddress[0], - Username: config.Config.Redis.DBUserName, - Password: config.Config.Redis.DBPassWord, // no password set - DB: 0, // use default DB - PoolSize: 100, // 连接池大小 - }) - _, err = DB.RDB.Ping(ctx).Result() - if err != nil { - panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord) - } - } - // 强一致性缓存,当一个key被标记删除,其他请求线程会被锁住轮询直到新的key生成,适合各种同步的拉取, 如果弱一致可能导致拉取还是老数据,毫无意义 - DB.Rc = rockscache.NewClient(DB.RDB, rockscache.NewDefaultOptions()) - DB.Rc.Options.StrongConsistency = true - - // 弱一致性缓存,当一个key被标记删除,其他请求线程直接返回该key的value,适合高频并且生成很缓存很慢的情况 如大群发消息缓存的缓存 - DB.WeakRc = rockscache.NewClient(DB.RDB, rockscache.NewDefaultOptions()) - DB.WeakRc.Options.StrongConsistency = false - - fmt.Println("init mysql redis mongo ok ") + return mongoClient } func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error { @@ -159,7 +82,7 @@ func createMongoIndex(client *mongo.Client, collection string, isUnique bool, ke indexView := db.Indexes() keysDoc := bsonx.Doc{} - // 复合索引 + // create composite indexes for _, key := range keys { if strings.HasPrefix(key, "-") { keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1)) @@ -168,7 +91,7 @@ func createMongoIndex(client *mongo.Client, collection string, isUnique bool, ke } } - // 创建索引 + // create index index := mongo.IndexModel{ Keys: keysDoc, } diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongo/mongo_model.go similarity index 89% rename from pkg/common/db/mongoModel.go rename to pkg/common/db/mongo/mongo_model.go index 4d7d01f80..1f46b3156 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongo/mongo_model.go @@ -1,8 +1,9 @@ -package db +package mongo import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/msg" open_im_sdk "Open_IM/pkg/proto/sdk_ws" @@ -59,7 +60,7 @@ type GroupMember_x struct { var ErrMsgListNotExist = errors.New("user not have msg in mongoDB") -func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) { +func (d *db.DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) { return 1, nil //var i, NB uint32 //var seqUid string @@ -89,12 +90,12 @@ func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) { //return MinSeq, nil } -func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) { +func (d *db.DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) { return 1, nil } // deleteMsgByLogic -func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) { +func (d *db.DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) { log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList) sortkeys.Uint32s(seqList) suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 { @@ -130,7 +131,7 @@ func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID return totalUnexistSeqList, err } -func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) ([]uint32, error) { +func (d *db.DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) ([]uint32, error) { log.Debug(operationID, utils.GetSelfFuncName(), "args ", suffixUserID, seqList) seqMsgList, indexList, unexistSeqList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID) if err != nil { @@ -145,7 +146,7 @@ func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint3 } // deleteMsgByLogic -func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error { +func (d *db.DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error { sortkeys.Uint32s(seqList) seqMsgs, err := d.GetMsgBySeqListMongo2(uid, seqList, operationID) if err != nil { @@ -161,7 +162,7 @@ func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string return nil } -func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error { +func (d *db.DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error { log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -182,7 +183,7 @@ func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgD return nil } -func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error { +func (d *db.DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error { log.NewInfo(operationID, utils.GetSelfFuncName(), uid, *msg) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -207,14 +208,14 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat return nil } -func (d *DataBases) UpdateOneMsgList(msg *UserChat) error { +func (d *db.DataBases) UpdateOneMsgList(msg *UserChat) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) _, err := c.UpdateOne(ctx, bson.M{"uid": msg.UID}, bson.M{"$set": bson.M{"msg": msg.Msg}}) return err } -func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { +func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList) var hasSeqList []uint32 singleCount := 0 @@ -270,7 +271,7 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st return seqMsg, nil } -func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) { +func (d *db.DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) @@ -292,14 +293,14 @@ func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, er } } -func (d *DataBases) DelMongoMsgs(IDList []string) error { +func (d *db.DataBases) DelMongoMsgs(IDList []string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) _, err := c.DeleteMany(ctx, bson.M{"uid": bson.M{"$in": IDList}}) return err } -func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) { +func (d *db.DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) userChat := &UserChat{} @@ -327,7 +328,7 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replac return replaceMaxSeq, err } -func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { +func (d *db.DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) @@ -355,7 +356,7 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error return nil, nil } -func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { +func (d *db.DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) @@ -390,7 +391,7 @@ func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error return nil, nil } -func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { +func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { var hasSeqList []uint32 singleCount := 0 ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) @@ -441,7 +442,7 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio } return seqMsg, nil } -func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { +func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { var hasSeqList []uint32 singleCount := 0 ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) @@ -493,7 +494,7 @@ func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uin return seqMsg, nil } -func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) { +func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) sChat := UserChat{} @@ -548,7 +549,7 @@ func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (e return exceptionMsg } -func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { +func (d *db.DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) newTime := getCurrentTimestampByMill() @@ -620,7 +621,7 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD // return nil //} -func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { +func (d *db.DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { var seqUid string newTime := getCurrentTimestampByMill() session := d.mgoSession.Clone() @@ -659,7 +660,7 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToD return nil } -func (d *DataBases) DelUserChat(uid string) error { +func (d *db.DataBases) DelUserChat(uid string) error { return nil //session := d.mgoSession.Clone() //if session == nil { @@ -677,7 +678,7 @@ func (d *DataBases) DelUserChat(uid string) error { //return nil } -func (d *DataBases) DelUserChatMongo2(uid string) error { +func (d *db.DataBases) DelUserChatMongo2(uid string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) filter := bson.M{"uid": uid} @@ -689,7 +690,7 @@ func (d *DataBases) DelUserChatMongo2(uid string) error { return nil } -func (d *DataBases) MgoUserCount() (int, error) { +func (d *db.DataBases) MgoUserCount() (int, error) { return 0, nil //session := d.mgoSession.Clone() //if session == nil { @@ -702,7 +703,7 @@ func (d *DataBases) MgoUserCount() (int, error) { //return c.Find(nil).Count() } -func (d *DataBases) MgoSkipUID(count int) (string, error) { +func (d *db.DataBases) MgoSkipUID(count int) (string, error) { return "", nil //session := d.mgoSession.Clone() //if session == nil { @@ -717,7 +718,7 @@ func (d *DataBases) MgoSkipUID(count int) (string, error) { //return sChat.UID, nil } -func (d *DataBases) GetGroupMember(groupID string) []string { +func (d *db.DataBases) GetGroupMember(groupID string) []string { return nil //groupInfo := GroupMember_x{} //groupInfo.GroupID = groupID @@ -738,7 +739,7 @@ func (d *DataBases) GetGroupMember(groupID string) []string { //return groupInfo.UIDList } -func (d *DataBases) AddGroupMember(groupID, uid string) error { +func (d *db.DataBases) AddGroupMember(groupID, uid string) error { return nil //session := d.mgoSession.Clone() //if session == nil { @@ -771,7 +772,7 @@ func (d *DataBases) AddGroupMember(groupID, uid string) error { //return nil } -func (d *DataBases) DelGroupMember(groupID, uid string) error { +func (d *db.DataBases) DelGroupMember(groupID, uid string) error { return nil //session := d.mgoSession.Clone() //if session == nil { @@ -795,7 +796,7 @@ type Tag struct { UserList []string `bson:"user_list"` } -func (d *DataBases) GetUserTags(userID string) ([]Tag, error) { +func (d *db.DataBases) GetUserTags(userID string) ([]Tag, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag) var tags []Tag @@ -809,7 +810,7 @@ func (d *DataBases) GetUserTags(userID string) ([]Tag, error) { return tags, nil } -func (d *DataBases) CreateTag(userID, tagName string, userList []string) error { +func (d *db.DataBases) CreateTag(userID, tagName string, userList []string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag) tagID := generateTagID(tagName, userID) @@ -823,7 +824,7 @@ func (d *DataBases) CreateTag(userID, tagName string, userList []string) error { return err } -func (d *DataBases) GetTagByID(userID, tagID string) (Tag, error) { +func (d *db.DataBases) GetTagByID(userID, tagID string) (Tag, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag) var tag Tag @@ -831,14 +832,14 @@ func (d *DataBases) GetTagByID(userID, tagID string) (Tag, error) { return tag, err } -func (d *DataBases) DeleteTag(userID, tagID string) error { +func (d *db.DataBases) DeleteTag(userID, tagID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag) _, err := c.DeleteOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}) return err } -func (d *DataBases) SetTag(userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error { +func (d *db.DataBases) SetTag(userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag) var tag Tag @@ -873,7 +874,7 @@ func (d *DataBases) SetTag(userID, tagID, newName string, increaseUserIDList []s return nil } -func (d *DataBases) GetUserIDListByTagID(userID, tagID string) ([]string, error) { +func (d *db.DataBases) GetUserIDListByTagID(userID, tagID string) ([]string, error) { var tag Tag ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag) @@ -894,14 +895,14 @@ type TagSendLog struct { SendTime int64 `bson:"send_time"` } -func (d *DataBases) SaveTagSendLog(tagSendLog *TagSendLog) error { +func (d *db.DataBases) SaveTagSendLog(tagSendLog *TagSendLog) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSendLog) _, err := c.InsertOne(ctx, tagSendLog) return err } -func (d *DataBases) GetTagSendLogs(userID string, showNumber, pageNumber int32) ([]TagSendLog, error) { +func (d *db.DataBases) GetTagSendLogs(userID string, showNumber, pageNumber int32) ([]TagSendLog, error) { var tagSendLogs []TagSendLog ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSendLog) @@ -947,7 +948,7 @@ type Comment struct { CreateTime int32 `bson:"create_time" json:"create_time"` } -func (d *DataBases) CreateOneWorkMoment(workMoment *WorkMoment) error { +func (d *db.DataBases) CreateOneWorkMoment(workMoment *WorkMoment) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) workMomentID := generateWorkMomentID(workMoment.UserID) @@ -957,14 +958,14 @@ func (d *DataBases) CreateOneWorkMoment(workMoment *WorkMoment) error { return err } -func (d *DataBases) DeleteOneWorkMoment(workMomentID string) error { +func (d *db.DataBases) DeleteOneWorkMoment(workMomentID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) _, err := c.DeleteOne(ctx, bson.M{"work_moment_id": workMomentID}) return err } -func (d *DataBases) DeleteComment(workMomentID, contentID, opUserID string) error { +func (d *db.DataBases) DeleteComment(workMomentID, contentID, opUserID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) _, err := c.UpdateOne(ctx, bson.D{{"work_moment_id", workMomentID}, @@ -976,7 +977,7 @@ func (d *DataBases) DeleteComment(workMomentID, contentID, opUserID string) erro return err } -func (d *DataBases) GetWorkMomentByID(workMomentID string) (*WorkMoment, error) { +func (d *db.DataBases) GetWorkMomentByID(workMomentID string) (*WorkMoment, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) workMoment := &WorkMoment{} @@ -984,7 +985,7 @@ func (d *DataBases) GetWorkMomentByID(workMomentID string) (*WorkMoment, error) return workMoment, err } -func (d *DataBases) LikeOneWorkMoment(likeUserID, userName, workMomentID string) (*WorkMoment, bool, error) { +func (d *db.DataBases) LikeOneWorkMoment(likeUserID, userName, workMomentID string) (*WorkMoment, bool, error) { workMoment, err := d.GetWorkMomentByID(workMomentID) if err != nil { return nil, false, err @@ -1006,11 +1007,11 @@ func (d *DataBases) LikeOneWorkMoment(likeUserID, userName, workMomentID string) return workMoment, !isAlreadyLike, err } -func (d *DataBases) SetUserWorkMomentsLevel(userID string, level int32) error { +func (d *db.DataBases) SetUserWorkMomentsLevel(userID string, level int32) error { return nil } -func (d *DataBases) CommentOneWorkMoment(comment *Comment, workMomentID string) (WorkMoment, error) { +func (d *db.DataBases) CommentOneWorkMoment(comment *Comment, workMomentID string) (WorkMoment, error) { comment.ContentID = generateWorkMomentCommentID(workMomentID) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) @@ -1019,7 +1020,7 @@ func (d *DataBases) CommentOneWorkMoment(comment *Comment, workMomentID string) return workMoment, err } -func (d *DataBases) GetUserSelfWorkMoments(userID string, showNumber, pageNumber int32) ([]WorkMoment, error) { +func (d *db.DataBases) GetUserSelfWorkMoments(userID string, showNumber, pageNumber int32) ([]WorkMoment, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) var workMomentList []WorkMoment @@ -1032,7 +1033,7 @@ func (d *DataBases) GetUserSelfWorkMoments(userID string, showNumber, pageNumber return workMomentList, err } -func (d *DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]WorkMoment, error) { +func (d *db.DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]WorkMoment, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) var workMomentList []WorkMoment @@ -1052,7 +1053,7 @@ func (d *DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, page return workMomentList, err } -func (d *DataBases) GetUserFriendWorkMoments(showNumber, pageNumber int32, userID string, friendIDList []string) ([]WorkMoment, error) { +func (d *db.DataBases) GetUserFriendWorkMoments(showNumber, pageNumber int32, userID string, friendIDList []string) ([]WorkMoment, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment) var workMomentList []WorkMoment @@ -1100,7 +1101,7 @@ type UserToSuperGroup struct { GroupIDList []string `bson:"group_id_list" json:"groupIDList"` } -func (d *DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error { +func (d *db.DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) session, err := d.mongoClient.StartSession() @@ -1145,7 +1146,7 @@ func (d *DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, return err } -func (d *DataBases) GetSuperGroup(groupID string) (SuperGroup, error) { +func (d *db.DataBases) GetSuperGroup(groupID string) (SuperGroup, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) superGroup := SuperGroup{} @@ -1153,7 +1154,7 @@ func (d *DataBases) GetSuperGroup(groupID string) (SuperGroup, error) { return superGroup, err } -func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error { +func (d *db.DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) session, err := d.mongoClient.StartSession() @@ -1192,7 +1193,7 @@ func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) err return err } -func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error { +func (d *db.DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) session, err := d.mongoClient.StartSession() @@ -1215,7 +1216,7 @@ func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []strin return err } -func (d *DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) { +func (d *db.DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) var user UserToSuperGroup @@ -1223,7 +1224,7 @@ func (d *DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, erro return user, nil } -func (d *DataBases) DeleteSuperGroup(groupID string) error { +func (d *db.DataBases) DeleteSuperGroup(groupID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) session, err := d.mongoClient.StartSession() @@ -1247,7 +1248,7 @@ func (d *DataBases) DeleteSuperGroup(groupID string) error { return nil } -func (d *DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error { +func (d *db.DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error { var users []UserToSuperGroup for _, v := range userIDList { users = append(users, UserToSuperGroup{ @@ -1342,7 +1343,7 @@ func superGroupIndexGen(groupID string, seqSuffix uint32) string { return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) } -func (d *DataBases) CleanUpUserMsgFromMongo(userID string, operationID string) error { +func (d *db.DataBases) CleanUpUserMsgFromMongo(userID string, operationID string) error { ctx := context.Background() c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) maxSeq, err := d.GetUserMaxSeq(userID) diff --git a/pkg/common/db/mongo/office.go b/pkg/common/db/mongo/office.go new file mode 100644 index 000000000..af2a1bf5b --- /dev/null +++ b/pkg/common/db/mongo/office.go @@ -0,0 +1 @@ +package mongo diff --git a/pkg/common/db/mysql.go b/pkg/common/db/mysql.go deleted file mode 100644 index c87d3f6f7..000000000 --- a/pkg/common/db/mysql.go +++ /dev/null @@ -1,157 +0,0 @@ -package db - -import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - "Open_IM/pkg/utils" - "fmt" - "time" - - "gorm.io/driver/mysql" - "gorm.io/gorm" - "gorm.io/gorm/logger" -) - -type mysqlDB struct { - //sync.RWMutex - db *gorm.DB -} - -type Writer struct{} - -func (w Writer) Printf(format string, args ...interface{}) { - fmt.Printf(format, args...) -} - -func initMysqlDB() { - dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", - config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql") - var db *gorm.DB - var err1 error - db, err := gorm.Open(mysql.Open(dsn), nil) - if err != nil { - time.Sleep(time.Duration(30) * time.Second) - db, err1 = gorm.Open(mysql.Open(dsn), nil) - if err1 != nil { - panic(err1.Error() + " open failed " + dsn) - } - } - sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8 COLLATE utf8_general_ci;", config.Config.Mysql.DBDatabaseName) - err = db.Exec(sql).Error - if err != nil { - panic(err.Error() + " Exec failed " + sql) - } - dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", - config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) - newLogger := logger.New( - Writer{}, - logger.Config{ - SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold - LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level - IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger - Colorful: true, // Disable color - }, - ) - db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ - Logger: newLogger, - }) - if err != nil { - panic(err.Error() + " Open failed " + dsn) - } - - sqlDB, err := db.DB() - if err != nil { - panic(err.Error() + " db.DB() failed ") - } - - sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime)) - sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) - sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) - - db.AutoMigrate( - &im_mysql_model.Friend{}, - &im_mysql_model.FriendRequest{}, - &im_mysql_model.Group{}, - &im_mysql_model.GroupMember{}, - &im_mysql_model.GroupRequest{}, - &im_mysql_model.User{}, - &im_mysql_model.Black{}, &im_mysql_model.ChatLog{}, &im_mysql_model.Conversation{}, &im_mysql_model.AppVersion{}, &im_mysql_model.BlackList{}, - ) - db.Set("gorm:table_options", "CHARSET=utf8") - db.Set("gorm:table_options", "collation=utf8_unicode_ci") - - if !db.Migrator().HasTable(&im_mysql_model.Friend{}) { - db.Migrator().CreateTable(&im_mysql_model.Friend{}) - } - if !db.Migrator().HasTable(&im_mysql_model.FriendRequest{}) { - db.Migrator().CreateTable(&im_mysql_model.FriendRequest{}) - } - if !db.Migrator().HasTable(&im_mysql_model.Group{}) { - db.Migrator().CreateTable(&im_mysql_model.Group{}) - } - if !db.Migrator().HasTable(&im_mysql_model.GroupMember{}) { - db.Migrator().CreateTable(&im_mysql_model.GroupMember{}) - } - if !db.Migrator().HasTable(&im_mysql_model.GroupRequest{}) { - db.Migrator().CreateTable(&im_mysql_model.GroupRequest{}) - } - if !db.Migrator().HasTable(&im_mysql_model.User{}) { - db.Migrator().CreateTable(&im_mysql_model.User{}) - } - if !db.Migrator().HasTable(&im_mysql_model.Black{}) { - db.Migrator().CreateTable(&im_mysql_model.Black{}) - } - if !db.Migrator().HasTable(&im_mysql_model.ChatLog{}) { - db.Migrator().CreateTable(&im_mysql_model.ChatLog{}) - } - if !db.Migrator().HasTable(&im_mysql_model.Register{}) { - db.Migrator().CreateTable(&im_mysql_model.Register{}) - } - if !db.Migrator().HasTable(&im_mysql_model.Conversation{}) { - db.Migrator().CreateTable(&im_mysql_model.Conversation{}) - } - - DB.MysqlDB.db = db - im_mysql_model.GroupDB = db.Table("groups") - im_mysql_model.GroupMemberDB = db.Table("group_members") - im_mysql_model.UserDB = db.Table("users") - im_mysql_model.ChatLogDB = db.Table("chat_logs") - im_mysql_model.BlackListDB = db.Table("black_lists") - im_mysql_model.BlackDB = db.Table("blacks") - im_mysql_model.AppDB = db.Table("app_version") - im_mysql_model.BlackDB = db.Table("blacks") - im_mysql_model.ConversationDB = db.Table("conversations") - im_mysql_model.FriendDB = db.Table("friends") - im_mysql_model.FriendRequestDB = db.Table("friend_requests") - im_mysql_model.GroupRequestDB = db.Table("group_requests") - InitManager() -} - -func InitManager() { - for k, v := range config.Config.Manager.AppManagerUid { - _, err := im_mysql_model.GetUserByUserID(v) - if err != nil { - } else { - continue - } - var appMgr im_mysql_model.User - appMgr.UserID = v - if k == 0 { - appMgr.Nickname = config.Config.Manager.AppSysNotificationName - } else { - appMgr.Nickname = "AppManager" + utils.IntToString(k+1) - } - appMgr.AppMangerLevel = constant.AppAdmin - err = im_mysql_model.UserRegister(appMgr) - if err != nil { - fmt.Println("AppManager insert error ", err.Error(), appMgr) - } else { - fmt.Println("AppManager insert ", appMgr) - } - } -} - -func (m *mysqlDB) DefaultGormDB() *gorm.DB { - return DB.MysqlDB.db -} diff --git a/pkg/common/db/mysql_model/im_mysql_model/message_cms.go b/pkg/common/db/mysql/chat_log_model.go similarity index 70% rename from pkg/common/db/mysql_model/im_mysql_model/message_cms.go rename to pkg/common/db/mysql/chat_log_model.go index d6b83650e..ccc53ceae 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/message_cms.go +++ b/pkg/common/db/mysql/chat_log_model.go @@ -1,8 +1,15 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" + pbMsg "Open_IM/pkg/proto/msg" + server_api_params "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" "fmt" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/jinzhu/copier" "gorm.io/gorm" "time" ) @@ -31,6 +38,34 @@ func (ChatLog) TableName() string { return "chat_logs" } +func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error { + chatLog := new(ChatLog) + copier.Copy(chatLog, msg.MsgData) + switch msg.MsgData.SessionType { + case constant.GroupChatType, constant.SuperGroupChatType: + chatLog.RecvID = msg.MsgData.GroupID + case constant.SingleChatType: + chatLog.RecvID = msg.MsgData.RecvID + } + if msg.MsgData.ContentType >= constant.NotificationBegin && msg.MsgData.ContentType <= constant.NotificationEnd { + var tips server_api_params.TipsComm + _ = proto.Unmarshal(msg.MsgData.Content, &tips) + marshaler := jsonpb.Marshaler{ + OrigName: true, + EnumsAsInts: false, + EmitDefaults: false, + } + chatLog.Content, _ = marshaler.MarshalToString(&tips) + + } else { + chatLog.Content = string(msg.MsgData.Content) + } + chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime) + chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime) + log.NewDebug("test", "this is ", chatLog) + return db.DB.MysqlDB.DefaultGormDB().Table("chat_logs").Create(chatLog).Error +} + func GetChatLog(chatLog *ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []ChatLog, error) { mdb := ChatLogDB.Table("chat_logs") if chatLog.SendTime.Unix() > 0 { diff --git a/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go b/pkg/common/db/mysql/conversation_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/conversation_model.go rename to pkg/common/db/mysql/conversation_model.go index 06f1737eb..c487fb697 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go +++ b/pkg/common/db/mysql/conversation_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "gorm.io/gorm" diff --git a/pkg/common/db/mysql_model/im_mysql_model/demo_model.go b/pkg/common/db/mysql/demo_model.go similarity index 98% rename from pkg/common/db/mysql_model/im_mysql_model/demo_model.go rename to pkg/common/db/mysql/demo_model.go index da3dbda25..e1ead3baf 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/demo_model.go +++ b/pkg/common/db/mysql/demo_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "gorm.io/gorm" diff --git a/pkg/common/db/mysql_model/im_mysql_model/file_model.go b/pkg/common/db/mysql/file_model.go similarity index 98% rename from pkg/common/db/mysql_model/im_mysql_model/file_model.go rename to pkg/common/db/mysql/file_model.go index 95bd65807..36c03ea37 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/file_model.go +++ b/pkg/common/db/mysql/file_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "gorm.io/gorm" diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go b/pkg/common/db/mysql/friend_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/friend_model.go rename to pkg/common/db/mysql/friend_model.go index 4bba5931d..cc2d1e886 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go +++ b/pkg/common/db/mysql/friend_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "fmt" diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_model_k.go b/pkg/common/db/mysql/friend_model_k.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/friend_model_k.go rename to pkg/common/db/mysql/friend_model_k.go index 9f2105079..9523ecb42 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_model_k.go +++ b/pkg/common/db/mysql/friend_model_k.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/trace_log" diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go b/pkg/common/db/mysql/friend_request_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go rename to pkg/common/db/mysql/friend_request_model.go index ce0563c29..f39f3beb7 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go +++ b/pkg/common/db/mysql/friend_request_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/trace_log" diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go b/pkg/common/db/mysql/group_member_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/group_member_model.go rename to pkg/common/db/mysql/group_member_model.go index bccaf8c6e..2cbab3a26 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go +++ b/pkg/common/db/mysql/group_member_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql //type GroupMember struct { // GroupID string `gorm:"column:group_id;primaryKey;"` diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go b/pkg/common/db/mysql/group_member_model_k.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go rename to pkg/common/db/mysql/group_member_model_k.go index b695119ad..57d808a5b 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go +++ b/pkg/common/db/mysql/group_member_model_k.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/constant" diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql/group_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/group_model.go rename to pkg/common/db/mysql/group_model.go index f5e5b9c08..66b53abe7 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql/group_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/constant" diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model_k.go b/pkg/common/db/mysql/group_model_k.go similarity index 95% rename from pkg/common/db/mysql_model/im_mysql_model/group_model_k.go rename to pkg/common/db/mysql/group_model_k.go index 213347758..b6797109a 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model_k.go +++ b/pkg/common/db/mysql/group_model_k.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/trace_log" @@ -8,8 +8,6 @@ import ( "time" ) -var GroupDB *gorm.DB - type Group struct { GroupID string `gorm:"column:group_id;primary_key;size:64" json:"groupID" binding:"required"` GroupName string `gorm:"column:name;size:255" json:"groupName"` @@ -26,6 +24,13 @@ type Group struct { ApplyMemberFriend int32 `gorm:"column:apply_member_friend" json:"applyMemberFriend"` NotificationUpdateTime time.Time `gorm:"column:notification_update_time"` NotificationUserID string `gorm:"column:notification_user_id;size:64"` + DB *gorm.DB +} + +func NewGroupDB() *Group { + var group Group + group.DB = initMysqlDB(&group) + return &group } func (*Group) Create(ctx context.Context, groups []*Group) (err error) { diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go b/pkg/common/db/mysql/group_request_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/group_request_model.go rename to pkg/common/db/mysql/group_request_model.go index 06a09f16a..265d133e7 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go +++ b/pkg/common/db/mysql/group_request_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql // //func UpdateGroupRequest(groupRequest GroupRequest) error { diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_request_model_k.go b/pkg/common/db/mysql/group_request_model_k.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/group_request_model_k.go rename to pkg/common/db/mysql/group_request_model_k.go index 1f80452cd..21409f62e 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_request_model_k.go +++ b/pkg/common/db/mysql/group_request_model_k.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/trace_log" diff --git a/pkg/common/db/mysql/init_mysql.go b/pkg/common/db/mysql/init_mysql.go new file mode 100644 index 000000000..1ec3cc8ab --- /dev/null +++ b/pkg/common/db/mysql/init_mysql.go @@ -0,0 +1,67 @@ +package mysql + +import ( + "Open_IM/pkg/common/config" + "fmt" + "time" + + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +func initMysqlDB(model interface{}) *gorm.DB { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", + config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql") + var db *gorm.DB + db, err := gorm.Open(mysql.Open(dsn), nil) + if err != nil { + time.Sleep(time.Duration(30) * time.Second) + db, err = gorm.Open(mysql.Open(dsn), nil) + if err != nil { + panic(err.Error() + " open failed " + dsn) + } + } + sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8 COLLATE utf8_general_ci;", config.Config.Mysql.DBDatabaseName) + err = db.Exec(sql).Error + if err != nil { + panic(err.Error() + " Exec failed:" + sql) + } + dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", + config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) + newLogger := logger.New( + Writer{}, + logger.Config{ + SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold + LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level + IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger + Colorful: true, // Disable color + }, + ) + db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ + Logger: newLogger, + }) + if err != nil { + panic(err.Error() + " Open failed " + dsn) + } + sqlDB, err := db.DB() + if err != nil { + panic(err.Error() + " db.DB() failed ") + } + sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime)) + sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) + sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) + //models := []interface{}{&Friend{}, &FriendRequest{}, &Group{}, &GroupMember{}, &GroupRequest{}, + // &User{}, &Black{}, &ChatLog{}, &Conversation{}, &AppVersion{}} + db.AutoMigrate(model) + db.Set("gorm:table_options", "CHARSET=utf8") + db.Set("gorm:table_options", "collation=utf8_unicode_ci") + _ = db.Migrator().CreateTable(model) + return db.Model(model) +} + +type Writer struct{} + +func (w Writer) Printf(format string, args ...interface{}) { + fmt.Printf(format, args...) +} diff --git a/pkg/common/db/mysql_model/im_mysql_model/model_struct.go b/pkg/common/db/mysql/model_struct.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/model_struct.go rename to pkg/common/db/mysql/model_struct.go index b88b19602..2e00f3107 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/model_struct.go +++ b/pkg/common/db/mysql/model_struct.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql //type Register struct { // Account string `gorm:"column:account;primary_key;type:char(255)" json:"account"` diff --git a/pkg/common/db/mysql_model/im_mysql_model/statistics_model.go b/pkg/common/db/mysql/statistics_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/statistics_model.go rename to pkg/common/db/mysql/statistics_model.go index 894c335e1..328fbc9bb 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/statistics_model.go +++ b/pkg/common/db/mysql/statistics_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/constant" diff --git a/pkg/common/db/mysql_model/im_mysql_model/user_black_list_model.go b/pkg/common/db/mysql/user_black_list_model.go similarity index 99% rename from pkg/common/db/mysql_model/im_mysql_model/user_black_list_model.go rename to pkg/common/db/mysql/user_black_list_model.go index 14d91bf85..d3877d2af 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/user_black_list_model.go +++ b/pkg/common/db/mysql/user_black_list_model.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/trace_log" diff --git a/pkg/common/db/mysql_model/im_mysql_model/user_model.go b/pkg/common/db/mysql/user_model.go similarity index 60% rename from pkg/common/db/mysql_model/im_mysql_model/user_model.go rename to pkg/common/db/mysql/user_model.go index 21d68d003..757b446fa 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/user_model.go +++ b/pkg/common/db/mysql/user_model.go @@ -1,9 +1,9 @@ -package im_mysql_model +package mysql import ( + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/utils" - "errors" "fmt" "gorm.io/gorm" "time" @@ -14,10 +14,28 @@ var ( UserDB *gorm.DB ) -type BlackList struct { - UserId string `gorm:"column:uid"` - BeginDisableTime time.Time `gorm:"column:begin_disable_time"` - EndDisableTime time.Time `gorm:"column:end_disable_time"` +func InitManager() { + for k, v := range config.Config.Manager.AppManagerUid { + _, err := GetUserByUserID(v) + if err != nil { + } else { + continue + } + var appMgr User + appMgr.UserID = v + if k == 0 { + appMgr.Nickname = config.Config.Manager.AppSysNotificationName + } else { + appMgr.Nickname = "AppManager" + utils.IntToString(k+1) + } + appMgr.AppMangerLevel = constant.AppAdmin + err = UserRegister(appMgr) + if err != nil { + fmt.Println("AppManager insert error ", err.Error(), appMgr) + } else { + fmt.Println("AppManager insert ", appMgr) + } + } } func UserRegister(user User) error { @@ -130,110 +148,17 @@ func AddUser(userID string, phoneNumber string, name string, email string, gende return result.Error } -func UserIsBlock(userId string) (bool, error) { - var user BlackList - rows := BlackListDB.Where("uid=?", userId).First(&user).RowsAffected - if rows >= 1 { - return user.EndDisableTime.After(time.Now()), nil - } - return false, nil -} - func UsersIsBlock(userIDList []string) (inBlockUserIDList []string, err error) { err = BlackListDB.Where("uid in (?) and end_disable_time > now()", userIDList).Pluck("uid", &inBlockUserIDList).Error return inBlockUserIDList, err } -func BlockUser(userID, endDisableTime string) error { - user, err := GetUserByUserID(userID) - if err != nil || user.UserID == "" { - return err - } - end, err := time.Parse("2006-01-02 15:04:05", endDisableTime) - if err != nil { - return err - } - if end.Before(time.Now()) { - return errors.New("endDisableTime is before now") - } - var blockUser BlackList - BlackListDB.Where("uid=?", userID).First(&blockUser) - if blockUser.UserId != "" { - BlackListDB.Where("uid=?", blockUser.UserId).Update("end_disable_time", end) - return nil - } - blockUser = BlackList{ - UserId: userID, - BeginDisableTime: time.Now(), - EndDisableTime: end, - } - err = BlackListDB.Create(&blockUser).Error - return err -} - -func UnBlockUser(userID string) error { - return BlackListDB.Where("uid=?", userID).Delete(&BlackList{}).Error -} - type BlockUserInfo struct { User User BeginDisableTime time.Time EndDisableTime time.Time } -func GetBlockUserByID(userId string) (BlockUserInfo, error) { - var blockUserInfo BlockUserInfo - blockUser := BlackList{ - UserId: userId, - } - if err := BlackListDB.Table("black_lists").Where("uid=?", userId).Find(&blockUser).Error; err != nil { - return blockUserInfo, err - } - user := User{ - UserID: blockUser.UserId, - } - if err := BlackListDB.Find(&user).Error; err != nil { - return blockUserInfo, err - } - blockUserInfo.User.UserID = user.UserID - blockUserInfo.User.FaceURL = user.FaceURL - blockUserInfo.User.Nickname = user.Nickname - blockUserInfo.User.Birth = user.Birth - blockUserInfo.User.PhoneNumber = user.PhoneNumber - blockUserInfo.User.Email = user.Email - blockUserInfo.User.Gender = user.Gender - blockUserInfo.BeginDisableTime = blockUser.BeginDisableTime - blockUserInfo.EndDisableTime = blockUser.EndDisableTime - return blockUserInfo, nil -} - -func GetBlockUsers(showNumber, pageNumber int32) ([]BlockUserInfo, error) { - var blockUserInfos []BlockUserInfo - var blockUsers []BlackList - if err := BlackListDB.Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&blockUsers).Error; err != nil { - return blockUserInfos, err - } - for _, blockUser := range blockUsers { - var user User - if err := UserDB.Table("users").Where("user_id=?", blockUser.UserId).First(&user).Error; err == nil { - blockUserInfos = append(blockUserInfos, BlockUserInfo{ - User: User{ - UserID: user.UserID, - Nickname: user.Nickname, - FaceURL: user.FaceURL, - Birth: user.Birth, - PhoneNumber: user.PhoneNumber, - Email: user.Email, - Gender: user.Gender, - }, - BeginDisableTime: blockUser.BeginDisableTime, - EndDisableTime: blockUser.EndDisableTime, - }) - } - } - return blockUserInfos, nil -} - func GetUserByName(userName string, showNumber, pageNumber int32) ([]User, error) { var users []User err := UserDB.Where(" name like ?", fmt.Sprintf("%%%s%%", userName)).Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&users).Error diff --git a/pkg/common/db/mysql_model/im_mysql_model/user_model_k.go b/pkg/common/db/mysql/user_model_k.go similarity index 98% rename from pkg/common/db/mysql_model/im_mysql_model/user_model_k.go rename to pkg/common/db/mysql/user_model_k.go index 5c0e6e26b..bb93c13c1 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/user_model_k.go +++ b/pkg/common/db/mysql/user_model_k.go @@ -1,4 +1,4 @@ -package im_mysql_model +package mysql import ( "Open_IM/pkg/common/trace_log" diff --git a/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go b/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go deleted file mode 100644 index e11ef2866..000000000 --- a/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -** description(""). -** copyright('tuoyun,www.tuoyun.net'). -** author("fg,Gordon@tuoyun.net"). -** time(2021/3/4 11:18). - */ -package im_mysql_msg_model - -import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - "Open_IM/pkg/common/log" - pbMsg "Open_IM/pkg/proto/msg" - "Open_IM/pkg/proto/sdk_ws" - "Open_IM/pkg/utils" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" - "github.com/jinzhu/copier" -) - -func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error { - chatLog := new(im_mysql_model.ChatLog) - copier.Copy(chatLog, msg.MsgData) - switch msg.MsgData.SessionType { - case constant.GroupChatType, constant.SuperGroupChatType: - chatLog.RecvID = msg.MsgData.GroupID - case constant.SingleChatType: - chatLog.RecvID = msg.MsgData.RecvID - } - if msg.MsgData.ContentType >= constant.NotificationBegin && msg.MsgData.ContentType <= constant.NotificationEnd { - var tips server_api_params.TipsComm - _ = proto.Unmarshal(msg.MsgData.Content, &tips) - marshaler := jsonpb.Marshaler{ - OrigName: true, - EnumsAsInts: false, - EmitDefaults: false, - } - chatLog.Content, _ = marshaler.MarshalToString(&tips) - - } else { - chatLog.Content = string(msg.MsgData.Content) - } - chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime) - chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime) - log.NewDebug("test", "this is ", chatLog) - return db.DB.MysqlDB.DefaultGormDB().Table("chat_logs").Create(chatLog).Error -} diff --git a/pkg/common/db/mysql_model/im_mysql_msg_model/hash_code.go b/pkg/common/db/mysql_model/im_mysql_msg_model/hash_code.go deleted file mode 100644 index 2c3e81932..000000000 --- a/pkg/common/db/mysql_model/im_mysql_msg_model/hash_code.go +++ /dev/null @@ -1,16 +0,0 @@ -package im_mysql_msg_model - -import ( - "Open_IM/pkg/common/config" - "hash/crc32" -) - -func getHashMsgDBAddr(userID string) string { - hCode := crc32.ChecksumIEEE([]byte(userID)) - return config.Config.Mysql.DBAddress[hCode%uint32(len(config.Config.Mysql.DBAddress))] -} - -func getHashMsgTableIndex(userID string) int { - hCode := crc32.ChecksumIEEE([]byte(userID)) - return int(hCode % uint32(config.Config.Mysql.DBMsgTableNum)) -}