From 656241c23f2daad909058b5d99891160b0199f90 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 20 Jul 2022 11:35:19 +0800 Subject: [PATCH] docker-compose fix --- pkg/common/db/RedisModel.go | 60 ++++++++++++------------ pkg/common/db/model.go | 10 ++-- pkg/common/db/rocks_cache/rocks_cache.go | 49 ++++++++++++++----- pkg/utils/cache_delete.go | 1 + 4 files changed, 73 insertions(+), 47 deletions(-) create mode 100644 pkg/utils/cache_delete.go diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index bfeccfd69..bf826b682 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -41,7 +41,7 @@ const ( //} func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { key := accountTempCode + account - n, err := d.rdb.Exists(context.Background(), key).Result() + n, err := d.RDB.Exists(context.Background(), key).Result() if n > 0 { return true, err } else { @@ -50,43 +50,43 @@ func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { } func (d *DataBases) SetAccountCode(account string, code, ttl int) (err error) { key := accountTempCode + account - return d.rdb.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err() + return d.RDB.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err() } func (d *DataBases) GetAccountCode(account string) (string, error) { key := accountTempCode + account - return d.rdb.Get(context.Background(), key).Result() + return d.RDB.Get(context.Background(), key).Result() } //Perform seq auto-increment operation of user messages func (d *DataBases) IncrUserSeq(uid string) (uint64, error) { key := userIncrSeq + uid - seq, err := d.rdb.Incr(context.Background(), key).Result() + seq, err := d.RDB.Incr(context.Background(), key).Result() return uint64(seq), err } //Get the largest Seq func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) { key := userIncrSeq + uid - seq, err := d.rdb.Get(context.Background(), key).Result() + seq, err := d.RDB.Get(context.Background(), key).Result() return uint64(utils.StringToInt(seq)), err } //set the largest Seq func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error { key := userIncrSeq + uid - return d.rdb.Set(context.Background(), key, maxSeq, 0).Err() + return d.RDB.Set(context.Background(), key, maxSeq, 0).Err() } //Set the user's minimum seq func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) { key := userMinSeq + uid - return d.rdb.Set(context.Background(), key, minSeq, 0).Err() + return d.RDB.Set(context.Background(), key, minSeq, 0).Err() } //Get the smallest Seq func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) { key := userMinSeq + uid - seq, err := d.rdb.Get(context.Background(), key).Result() + seq, err := d.RDB.Get(context.Background(), key).Result() return uint64(utils.StringToInt(seq)), err } @@ -94,13 +94,13 @@ func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) { func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) log2.NewDebug("", "add token key is ", key) - return d.rdb.HSet(context.Background(), key, token, flag).Err() + return d.RDB.HSet(context.Background(), key, token, flag).Err() } func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) { key := uidPidToken + userID + ":" + platformID log2.NewDebug("", "get token key is ", key) - m, err := d.rdb.HGetAll(context.Background(), key).Result() + m, err := d.RDB.HGetAll(context.Background(), key).Result() mm := make(map[string]int) for k, v := range m { mm[k] = utils.StringToInt(v) @@ -113,29 +113,29 @@ func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[str for k, v := range m { mm[k] = v } - return d.rdb.HSet(context.Background(), key, mm).Err() + return d.RDB.HSet(context.Background(), key, mm).Err() } func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return d.rdb.HDel(context.Background(), key, fields...).Err() + return d.RDB.HDel(context.Background(), key, fields...).Err() } func (d *DataBases) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error { key := conversationReceiveMessageOpt + userID - return d.rdb.HSet(context.Background(), key, conversationID, opt).Err() + return d.RDB.HSet(context.Background(), key, conversationID, opt).Err() } func (d *DataBases) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) { key := conversationReceiveMessageOpt + userID - result, err := d.rdb.HGet(context.Background(), key, conversationID).Result() + result, err := d.RDB.HGet(context.Background(), key, conversationID).Result() return utils.StringToInt(result), err } func (d *DataBases) SetUserGlobalMsgRecvOpt(userID string, opt int32) error { key := conversationReceiveMessageOpt + userID - return d.rdb.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err() + return d.RDB.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err() } func (d *DataBases) GetUserGlobalMsgRecvOpt(userID string) (int, error) { key := conversationReceiveMessageOpt + userID - result, err := d.rdb.HGet(context.Background(), key, GlobalMsgRecvOpt).Result() + result, err := d.RDB.HGet(context.Background(), key, GlobalMsgRecvOpt).Result() if err != nil { if err == go_redis.Nil { return 0, nil @@ -150,7 +150,7 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := d.rdb.Get(context.Background(), key).Result() + result, err := d.RDB.Get(context.Background(), key).Result() if err != nil { errResult = err failedSeqList = append(failedSeqList, v) @@ -173,7 +173,7 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati } func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { ctx := context.Background() - pipe := d.rdb.Pipeline() + pipe := d.RDB.Pipeline() var failedList []pbChat.MsgDataToMQ for _, msg := range msgList { key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) @@ -200,7 +200,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error { ctx := context.Background() key := messageCache + userID + "_" + "*" - vals, err := d.rdb.Keys(ctx, key).Result() + vals, err := d.RDB.Keys(ctx, key).Result() log2.Debug(operationID, "vals: ", vals) if err == go_redis.Nil { return nil @@ -208,7 +208,7 @@ func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID str if err != nil { return utils.Wrap(err, "") } - if err = d.rdb.Del(ctx, vals...).Err(); err != nil { + if err = d.RDB.Del(ctx, vals...).Err(); err != nil { return utils.Wrap(err, "") } return nil @@ -244,16 +244,16 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData) return err } keyList := SignalListCache + userID - err = d.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err() + err = d.RDB.LPush(context.Background(), keyList, msg.ClientMsgID).Err() if err != nil { return err } - err = d.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() + err = d.RDB.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() if err != nil { return err } key := SignalCache + msg.ClientMsgID - err = d.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() + err = d.RDB.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() if err != nil { return err } @@ -265,7 +265,7 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData) func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { key := SignalCache + clientMsgID invitationInfo = &pbRtc.SignalInviteReq{} - bytes, err := d.rdb.Get(context.Background(), key).Bytes() + bytes, err := d.RDB.Get(context.Background(), key).Bytes() if err != nil { return nil, err } @@ -286,7 +286,7 @@ func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (inv func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { keyList := SignalListCache + userID - result := d.rdb.LPop(context.Background(), keyList) + result := d.RDB.LPop(context.Background(), keyList) if err = result.Err(); err != nil { return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed") } @@ -308,14 +308,14 @@ func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationI func (d *DataBases) DelUserSignalList(userID string) error { keyList := SignalListCache + userID - err := d.rdb.Del(context.Background(), keyList).Err() + err := d.RDB.Del(context.Background(), keyList).Err() return err } func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { for _, seq := range seqList { key := messageCache + uid + "_" + strconv.Itoa(int(seq)) - result := d.rdb.Get(context.Background(), key).String() + result := d.RDB.Get(context.Background(), key).String() var msg pbCommon.MsgData if err := utils.String2Pb(result, &msg); err != nil { log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, err.Error()) @@ -327,17 +327,17 @@ func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID st log2.Error(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg, err.Error()) continue } - if err := d.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := d.RDB.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { log2.Error(operationID, utils.GetSelfFuncName(), "Set failed", err.Error()) } } } func (d *DataBases) SetGetuiToken(token string, expireTime int64) error { - return d.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err() + return d.RDB.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err() } func (d *DataBases) GetGetuiToken() (string, error) { - result := d.rdb.Get(context.Background(), getuiToken) + result := d.RDB.Get(context.Background(), getuiToken) return result.String(), result.Err() } diff --git a/pkg/common/db/model.go b/pkg/common/db/model.go index be4b651f7..f3c274436 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/model.go @@ -29,7 +29,7 @@ type DataBases struct { mgoSession *mgo.Session //redisPool *redis.Pool mongoClient *mongo.Client - rdb go_redis.UniversalClient + RDB go_redis.UniversalClient Rc *rockscache.Client WeakRc *rockscache.Client } @@ -123,25 +123,25 @@ func init() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if config.Config.Redis.EnableCluster { - DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ + DB.RDB = go_redis.NewClusterClient(&go_redis.ClusterOptions{ Addrs: config.Config.Redis.DBAddress, Username: config.Config.Redis.DBUserName, Password: config.Config.Redis.DBPassWord, // no password set PoolSize: 50, }) - _, err = DB.rdb.Ping(ctx).Result() + _, err = DB.RDB.Ping(ctx).Result() if err != nil { panic(err.Error()) } } else { - DB.rdb = go_redis.NewClient(&go_redis.Options{ + DB.RDB = go_redis.NewClient(&go_redis.Options{ Addr: config.Config.Redis.DBAddress[0], Username: config.Config.Redis.DBUserName, Password: config.Config.Redis.DBPassWord, // no password set DB: 0, // use default DB PoolSize: 100, // 连接池大小 }) - _, err = DB.rdb.Ping(ctx).Result() + _, err = DB.RDB.Ping(ctx).Result() if err != nil { panic(err.Error()) } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index 4d2ba21cd..67514a5bd 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -3,7 +3,9 @@ package rocksCache import ( "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "context" "encoding/json" + "fmt" "strings" "time" ) @@ -23,12 +25,35 @@ const ( allDepartmentMemberCache = "ALL_DEPARTMENT_MEMBER_CACHE:" ) +func init() { + fmt.Println("init to del old keys") + for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache, + groupMemberInfoCache, groupAllMemberInfoCache} { + var cursor uint64 + var n int + for { + var keys []string + var err error + keys, cursor, err = db.DB.RDB.Scan(context.Background(), cursor, key+"*", 3000).Result() + if err != nil { + panic(err.Error()) + } + n += len(keys) + fmt.Printf("\n %s key found %d keys: %v, current cursor %d\n", key, n, keys, cursor) + if cursor == 0 { + break + } + } + } + +} + func GetFriendIDListFromCache(userID string) ([]string, error) { getFriendIDList := func() (string, error) { friendIDList, err := imdb.GetFriendIDListByUserID(userID) return strings.Join(friendIDList, ","), err } - friendIDListStr, err := db.DB.Rc.Fetch(friendRelationCache+userID, time.Second*30, getFriendIDList) + friendIDListStr, err := db.DB.Rc.Fetch(friendRelationCache+userID, time.Second*30*60, getFriendIDList) return strings.Split(friendIDListStr, ","), err } @@ -42,7 +67,7 @@ func GetBlackListFromCache(userID string) ([]string, error) { blackIDList, err := imdb.GetBlackIDListByUserID(userID) return strings.Join(blackIDList, ","), err } - blackIDListStr, err := db.DB.Rc.Fetch(blackListCache+userID, time.Second*30, getBlackIDList) + blackIDListStr, err := db.DB.Rc.Fetch(blackListCache+userID, time.Second*30*60, getBlackIDList) return strings.Split(blackIDListStr, ","), err } @@ -55,7 +80,7 @@ func GetJoinedGroupIDListFromCache(userID string) ([]string, error) { joinedGroupList, err := imdb.GetJoinedGroupIDListByUserID(userID) return strings.Join(joinedGroupList, ","), err } - joinedGroupIDListStr, err := db.DB.Rc.Fetch(joinedGroupListCache+userID, time.Second*30, getJoinedGroupIDList) + joinedGroupIDListStr, err := db.DB.Rc.Fetch(joinedGroupListCache+userID, time.Second*30*60, getJoinedGroupIDList) return strings.Split(joinedGroupIDListStr, ","), err } @@ -71,7 +96,7 @@ func GetGroupOwnerFromCache(groupID string) (string, error) { } return groupOwner.UserID, err } - groupOwnerID, err := db.DB.Rc.Fetch(groupOwnerIDCache+groupID, time.Second*30, getGroupOwnerIDList) + groupOwnerID, err := db.DB.Rc.Fetch(groupOwnerIDCache+groupID, time.Second*30*60, getGroupOwnerIDList) return groupOwnerID, err } @@ -84,7 +109,7 @@ func GetGroupMemberIDListFromCache(groupID string) ([]string, error) { groupMemberIDList, err := imdb.GetGroupMemberIDListByGroupID(groupID) return strings.Join(groupMemberIDList, ","), err } - groupIDListStr, err := db.DB.Rc.Fetch(groupCache+groupID, time.Second*30, getGroupMemberIDList) + groupIDListStr, err := db.DB.Rc.Fetch(groupCache+groupID, time.Second*30*60, getGroupMemberIDList) return strings.Split(groupIDListStr, ","), err } @@ -102,7 +127,7 @@ func GetUserInfoFromCache(userID string) (*db.User, error) { bytes, err := json.Marshal(userInfo) return string(bytes), err } - userInfoStr, err := db.DB.Rc.Fetch(userInfoCache+userID, time.Second*30, getUserInfo) + userInfoStr, err := db.DB.Rc.Fetch(userInfoCache+userID, time.Second*30*60, getUserInfo) if err != nil { return nil, err } @@ -124,7 +149,7 @@ func GetGroupMemberInfoFromCache(groupID, userID string) (*db.GroupMember, error bytes, err := json.Marshal(groupMemberInfo) return string(bytes), err } - groupMemberInfoStr, err := db.DB.Rc.Fetch(groupMemberInfoCache+groupID+"-"+userID, time.Second*30, getGroupMemberInfo) + groupMemberInfoStr, err := db.DB.Rc.Fetch(groupMemberInfoCache+groupID+"-"+userID, time.Second*30*60, getGroupMemberInfo) if err != nil { return nil, err } @@ -146,7 +171,7 @@ func GetAllGroupMembersInfoFromCache(groupID string) ([]*db.GroupMember, error) bytes, err := json.Marshal(groupMembers) return string(bytes), err } - groupMembersStr, err := db.DB.Rc.Fetch(groupAllMemberInfoCache+groupID, time.Second*30, getGroupMemberInfo) + groupMembersStr, err := db.DB.Rc.Fetch(groupAllMemberInfoCache+groupID, time.Second*30*60, getGroupMemberInfo) if err != nil { return nil, err } @@ -168,7 +193,7 @@ func GetGroupInfoFromCache(groupID string) (*db.Group, error) { bytes, err := json.Marshal(groupInfo) return string(bytes), err } - groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30, getGroupInfo) + groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo) if err != nil { return nil, err } @@ -190,7 +215,7 @@ func GetAllFriendsInfoFromCache(userID string) ([]*db.Friend, error) { bytes, err := json.Marshal(friendInfoList) return string(bytes), err } - allFriendInfoStr, err := db.DB.Rc.Fetch(allFriendInfoCache+userID, time.Second*30, getAllFriendInfo) + allFriendInfoStr, err := db.DB.Rc.Fetch(allFriendInfoCache+userID, time.Second*30*60, getAllFriendInfo) if err != nil { return nil, err } @@ -212,7 +237,7 @@ func GetAllDepartmentsFromCache() ([]*db.Department, error) { bytes, err := json.Marshal(departmentList) return string(bytes), err } - allDepartmentsStr, err := db.DB.Rc.Fetch(allDepartmentCache, time.Second*30, getAllDepartments) + allDepartmentsStr, err := db.DB.Rc.Fetch(allDepartmentCache, time.Second*30*60, getAllDepartments) if err != nil { return nil, err } @@ -234,7 +259,7 @@ func GetAllDepartmentMembersFromCache() ([]*db.DepartmentMember, error) { bytes, err := json.Marshal(departmentMembers) return string(bytes), err } - allDepartmentMembersStr, err := db.DB.Rc.Fetch(allDepartmentMemberCache, time.Second*30, getAllDepartmentMembers) + allDepartmentMembersStr, err := db.DB.Rc.Fetch(allDepartmentMemberCache, time.Second*30*60, getAllDepartmentMembers) if err != nil { return nil, err } diff --git a/pkg/utils/cache_delete.go b/pkg/utils/cache_delete.go new file mode 100644 index 000000000..d4b585bf7 --- /dev/null +++ b/pkg/utils/cache_delete.go @@ -0,0 +1 @@ +package utils