mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-31 11:15:40 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
142b9386fc
@ -90,7 +90,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
|
||||
msgs, err := db.DB.GetUserMsgListByIndex(ID, index)
|
||||
if err != nil || msgs.UID == "" {
|
||||
if err != nil {
|
||||
if err == mongo.ErrMsgListNotExist {
|
||||
if err == mongoDB.ErrMsgListNotExist {
|
||||
log.NewInfo(operationID, utils.GetSelfFuncName(), "ID:", ID, "index:", index, err.Error())
|
||||
} else {
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID)
|
||||
@ -104,7 +104,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
|
||||
return delStruct.getSetMinSeq() + 1, nil
|
||||
}
|
||||
log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
|
||||
if len(msgs.Msg) > mongo.GetSingleGocMsgNum() {
|
||||
if len(msgs.Msg) > mongoDB.GetSingleGocMsgNum() {
|
||||
log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID)
|
||||
}
|
||||
if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) {
|
||||
@ -150,7 +150,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
|
||||
return seq, utils.Wrap(err, "deleteMongoMsg failed")
|
||||
}
|
||||
|
||||
func msgListIsFull(chat *mongo.UserChat) bool {
|
||||
func msgListIsFull(chat *mongoDB.UserChat) bool {
|
||||
index, _ := strconv.Atoi(strings.Split(chat.UID, ":")[1])
|
||||
if index == 0 {
|
||||
if len(chat.Msg) >= 4999 {
|
||||
|
@ -71,14 +71,14 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
}
|
||||
if !notification.IsReact {
|
||||
// first time to modify
|
||||
var reactionExtensionList = make(map[string]mongo.KeyValue)
|
||||
extendMsg := mongo.ExtendMsg{
|
||||
var reactionExtensionList = make(map[string]mongoDB.KeyValue)
|
||||
extendMsg := mongoDB.ExtendMsg{
|
||||
ReactionExtensionList: reactionExtensionList,
|
||||
ClientMsgID: notification.ClientMsgID,
|
||||
MsgFirstModifyTime: notification.MsgFirstModifyTime,
|
||||
}
|
||||
for _, v := range notification.SuccessReactionExtensionList {
|
||||
reactionExtensionList[v.TypeKey] = mongo.KeyValue{
|
||||
reactionExtensionList[v.TypeKey] = mongoDB.KeyValue{
|
||||
TypeKey: v.TypeKey,
|
||||
Value: v.Value,
|
||||
LatestUpdateTime: v.LatestUpdateTime,
|
||||
|
@ -4,7 +4,7 @@ import (
|
||||
cbApi "Open_IM/pkg/callback_struct"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
imdb "Open_IM/pkg/common/db/mysql"
|
||||
"Open_IM/pkg/common/http"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/common/trace_log"
|
||||
@ -77,7 +77,7 @@ func callbackBeforeCreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq)
|
||||
return err
|
||||
}
|
||||
|
||||
func CallbackBeforeMemberJoinGroup(ctx context.Context, operationID string, groupMember *im_mysql_model.GroupMember, groupEx string) (err error) {
|
||||
func CallbackBeforeMemberJoinGroup(ctx context.Context, operationID string, groupMember *imdb.GroupMember, groupEx string) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxInfo(ctx, utils.GetFuncName(1), err, "groupMember", *groupMember, "groupEx", groupEx)
|
||||
}()
|
||||
|
@ -2,7 +2,7 @@ package group
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
imdb "Open_IM/pkg/common/db/mysql"
|
||||
"Open_IM/pkg/common/tools"
|
||||
pbGroup "Open_IM/pkg/proto/group"
|
||||
sdk "Open_IM/pkg/proto/sdk_ws"
|
||||
|
@ -5,16 +5,16 @@ import (
|
||||
chat "Open_IM/internal/rpc/msg"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
rocksCache "Open_IM/pkg/common/db/rocks_cache"
|
||||
"Open_IM/pkg/common/db/model"
|
||||
imdb "Open_IM/pkg/common/db/mysql"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/common/middleware"
|
||||
promePkg "Open_IM/pkg/common/prometheus"
|
||||
"Open_IM/pkg/common/token_verify"
|
||||
"Open_IM/pkg/common/tools"
|
||||
"Open_IM/pkg/common/trace_log"
|
||||
cp "Open_IM/pkg/common/utils"
|
||||
|
||||
cp "Open_IM/internal/utils"
|
||||
"Open_IM/pkg/getcdv3"
|
||||
pbCache "Open_IM/pkg/proto/cache"
|
||||
pbConversation "Open_IM/pkg/proto/conversation"
|
||||
@ -41,6 +41,8 @@ type groupServer struct {
|
||||
rpcRegisterName string
|
||||
etcdSchema string
|
||||
etcdAddr []string
|
||||
imdb.GroupInterface
|
||||
|
||||
}
|
||||
|
||||
func NewGroupServer(port int) *groupServer {
|
||||
@ -131,39 +133,26 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
||||
for _, adminUserID := range req.AdminUserIDs {
|
||||
userIDs = append(userIDs, adminUserID)
|
||||
}
|
||||
if utils.IsRepeatID(userIDs) {
|
||||
if utils.IsDuplicateID(userIDs) {
|
||||
return nil, constant.ErrArgs.Wrap("group member is repeated")
|
||||
}
|
||||
|
||||
users, err := getUsersInfo(ctx, userIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
userMap := make(map[string]*open_im_sdk.UserInfo)
|
||||
for i, user := range users {
|
||||
userMap[user.UserID] = users[i]
|
||||
}
|
||||
|
||||
if err := callbackBeforeCreateGroup(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groupId := genGroupID(ctx, req.GroupInfo.GroupID)
|
||||
|
||||
groupInfo := imdb.Group{}
|
||||
utils.CopyStructFields(&groupInfo, req.GroupInfo)
|
||||
groupInfo.CreatorUserID = tools.OpUserID(ctx)
|
||||
groupInfo.GroupID = groupId
|
||||
groupInfo.CreateTime = time.Now()
|
||||
if groupInfo.NotificationUpdateTime.Unix() < 0 {
|
||||
groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0)
|
||||
}
|
||||
|
||||
groupInfo, err := (&cp.PBGroup{req.GroupInfo}).Convert()
|
||||
groupInfo.GroupID = genGroupID(ctx, req.GroupInfo.GroupID)
|
||||
if req.GroupInfo.GroupType != constant.SuperGroup {
|
||||
|
||||
var groupMembers []*imdb.GroupMember
|
||||
joinGroup := func(userID string, roleLevel int32) error {
|
||||
groupMember := &imdb.GroupMember{GroupID: groupId, RoleLevel: roleLevel, OperatorUserID: tools.OpUserID(ctx), JoinSource: constant.JoinByInvitation, InviterUserID: tools.OpUserID(ctx)}
|
||||
groupMember := &imdb.GroupMember{GroupID: groupInfo.GroupID, RoleLevel: roleLevel, OperatorUserID: tools.OpUserID(ctx), JoinSource: constant.JoinByInvitation, InviterUserID: tools.OpUserID(ctx)}
|
||||
user := userMap[userID]
|
||||
utils.CopyStructFields(&groupMember, user)
|
||||
if err := CallbackBeforeMemberJoinGroup(ctx, tools.OperationID(ctx), groupMember, groupInfo.Ex); err != nil {
|
||||
@ -172,17 +161,22 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
||||
groupMembers = append(groupMembers, groupMember)
|
||||
return nil
|
||||
}
|
||||
if req.OwnerUserID == "" {
|
||||
if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, info := range req.InitMemberList {
|
||||
if err := joinGroup(info.UserID, info.RoleLevel); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, info := range req.InitMemberList {
|
||||
if err := joinGroup(info, constant.GroupOrdinaryUsers); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, info := range req.AdminUserIDs {
|
||||
if err := joinGroup(info, constant.GroupAdmin); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if err := model.
|
||||
if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -297,7 +297,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR
|
||||
}
|
||||
}
|
||||
|
||||
var tagSendLogs mongo.TagSendLog
|
||||
var tagSendLogs mongoDB.TagSendLog
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(successUserIDList))
|
||||
var lock sync.Mutex
|
||||
@ -310,7 +310,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR
|
||||
return
|
||||
}
|
||||
lock.Lock()
|
||||
tagSendLogs.UserList = append(tagSendLogs.UserList, mongo.TagUser{
|
||||
tagSendLogs.UserList = append(tagSendLogs.UserList, mongoDB.TagUser{
|
||||
UserID: userID,
|
||||
UserName: userName,
|
||||
})
|
||||
@ -389,10 +389,10 @@ func (s *officeServer) GetUserTagByID(_ context.Context, req *pbOffice.GetUserTa
|
||||
func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.CreateOneWorkMomentReq) (resp *pbOffice.CreateOneWorkMomentResp, err error) {
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||
resp = &pbOffice.CreateOneWorkMomentResp{CommonResp: &pbOffice.CommonResp{}}
|
||||
workMoment := mongo.WorkMoment{
|
||||
Comments: []*mongo.Comment{},
|
||||
LikeUserList: []*mongo.WorkMomentUser{},
|
||||
PermissionUserList: []*mongo.WorkMomentUser{},
|
||||
workMoment := mongoDB.WorkMoment{
|
||||
Comments: []*mongoDB.Comment{},
|
||||
LikeUserList: []*mongoDB.WorkMomentUser{},
|
||||
PermissionUserList: []*mongoDB.WorkMomentUser{},
|
||||
}
|
||||
createUser, err := imdb.GetUserByUserID(req.WorkMoment.UserID)
|
||||
if err != nil {
|
||||
@ -406,14 +406,14 @@ func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.Crea
|
||||
workMoment.UserName = createUser.Nickname
|
||||
workMoment.FaceURL = createUser.FaceURL
|
||||
workMoment.PermissionUserIDList = s.getPermissionUserIDList(req.OperationID, req.WorkMoment.PermissionGroupList, req.WorkMoment.PermissionUserList)
|
||||
workMoment.PermissionUserList = []*mongo.WorkMomentUser{}
|
||||
workMoment.PermissionUserList = []*mongoDB.WorkMomentUser{}
|
||||
for _, userID := range workMoment.PermissionUserIDList {
|
||||
userName, err := imdb.GetUserNameByUserID(userID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error())
|
||||
continue
|
||||
}
|
||||
workMoment.PermissionUserList = append(workMoment.PermissionUserList, &mongo.WorkMomentUser{
|
||||
workMoment.PermissionUserList = append(workMoment.PermissionUserList, &mongoDB.WorkMomentUser{
|
||||
UserID: userID,
|
||||
UserName: userName,
|
||||
})
|
||||
@ -503,7 +503,7 @@ func (s *officeServer) DeleteOneWorkMoment(_ context.Context, req *pbOffice.Dele
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func isUserCanSeeWorkMoment(userID string, workMoment mongo.WorkMoment) bool {
|
||||
func isUserCanSeeWorkMoment(userID string, workMoment mongoDB.WorkMoment) bool {
|
||||
if userID != workMoment.UserID {
|
||||
switch workMoment.Permission {
|
||||
case constant.WorkMomentPublic:
|
||||
@ -570,7 +570,7 @@ func (s *officeServer) CommentOneWorkMoment(_ context.Context, req *pbOffice.Com
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
comment := &mongo.Comment{
|
||||
comment := &mongoDB.Comment{
|
||||
UserID: req.UserID,
|
||||
UserName: commentUser.Nickname,
|
||||
ReplyUserID: req.ReplyUserID,
|
||||
@ -644,7 +644,7 @@ func (s *officeServer) GetUserWorkMoments(_ context.Context, req *pbOffice.GetUs
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||
resp = &pbOffice.GetUserWorkMomentsResp{CommonResp: &pbOffice.CommonResp{}, WorkMoments: []*pbOffice.WorkMoment{}}
|
||||
resp.Pagination = &pbCommon.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber}
|
||||
var workMoments []mongo.WorkMoment
|
||||
var workMoments []mongoDB.WorkMoment
|
||||
if req.UserID == req.OpUserID {
|
||||
workMoments, err = db.DB.GetUserSelfWorkMoments(req.UserID, req.Pagination.ShowNumber, req.Pagination.PageNumber)
|
||||
} else {
|
||||
|
@ -1,7 +1,7 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
imdb "Open_IM/pkg/common/db/mysql"
|
||||
sdk "Open_IM/pkg/proto/sdk_ws"
|
||||
utils2 "Open_IM/pkg/utils"
|
||||
utils "github.com/OpenIMSDK/open_utils"
|
||||
@ -120,7 +120,7 @@ type PBGroup struct {
|
||||
*sdk.GroupInfo
|
||||
}
|
||||
|
||||
func (pb *PBGroup) convert() (*imdb.Group, error) {
|
||||
func (pb *PBGroup) Convert() (*imdb.Group, error) {
|
||||
dst := &imdb.Group{}
|
||||
utils.CopyStructFields(dst, pb)
|
||||
return dst, nil
|
||||
@ -207,44 +207,31 @@ func (db *DBGroupRequest) convert() (*sdk.GroupRequest, error) {
|
||||
}
|
||||
|
||||
type DBUser struct {
|
||||
*imdb
|
||||
*imdb.User
|
||||
}
|
||||
|
||||
type PBUser struct {
|
||||
*sdk.UserInfo
|
||||
}
|
||||
|
||||
func (pb *PBUser) convert() (*DBUser, error) {
|
||||
dst := &DBUser{}
|
||||
func (pb *PBUser) convert() (*imdb.User, error) {
|
||||
dst := &imdb.User{}
|
||||
utils.CopyStructFields(dst, pb)
|
||||
|
||||
utils.CopyStructFields(dst, src)
|
||||
dst.Birth, _ = utils.TimeStringToTime(src.BirthStr)
|
||||
dst.CreateTime = utils.UnixSecondToTime(int64(src.CreateTime))
|
||||
|
||||
dst.Birth = utils.UnixSecondToTime(pb.Birthday)
|
||||
dst.CreateTime = utils.UnixSecondToTime(int64(pb.CreateTime))
|
||||
return dst, nil
|
||||
}
|
||||
func (db *DBUser) convert() (*PBUser, error) {
|
||||
dst := &sdk.GroupRequest{}
|
||||
|
||||
func (db *DBUser) convert() (*sdk.UserInfo, error) {
|
||||
dst := &sdk.UserInfo{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
dst.ReqTime = uint32(db.ReqTime.Unix())
|
||||
dst.HandleTime = uint32(db.HandledTime.Unix())
|
||||
dst.CreateTime = uint32(db.CreateTime.Unix())
|
||||
dst.Birthday = db.Birth.Unix()
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
func UserOpenIMCopyDB(dst *imdb.User, src *sdk.UserInfo) {
|
||||
utils.CopyStructFields(dst, src)
|
||||
dst.Birth, _ = utils.TimeStringToTime(src.BirthStr)
|
||||
dst.CreateTime = utils.UnixSecondToTime(int64(src.CreateTime))
|
||||
}
|
||||
|
||||
func UserDBCopyOpenIM(dst *open_im_sdk.UserInfo, src *imdb.User) {
|
||||
utils.CopyStructFields(dst, src)
|
||||
dst.CreateTime = uint32(src.CreateTime.Unix())
|
||||
//dst.Birth = uint32(src.Birth.Unix())
|
||||
dst.BirthStr = utils2.TimeToString(src.Birth)
|
||||
}
|
||||
|
||||
func UserDBCopyOpenIMPublicUser(dst *open_im_sdk.PublicUserInfo, src *imdb.User) {
|
||||
utils.CopyStructFields(dst, src)
|
||||
func (db *DBUser) convertPublic() (*sdk.PublicUserInfo, error) {
|
||||
dst := &sdk.PublicUserInfo{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
return dst, nil
|
||||
}
|
||||
|
33
pkg/common/db/cache/group.go
vendored
33
pkg/common/db/cache/group.go
vendored
@ -15,14 +15,22 @@ const GroupExpireTime = time.Second * 60 * 60 * 12
|
||||
const groupInfoCacheKey = "GROUP_INFO_CACHE:"
|
||||
|
||||
type GroupCache struct {
|
||||
Client *Client
|
||||
db *mysql.Group
|
||||
expireTime time.Duration
|
||||
db mysql.GroupModelInterface
|
||||
expireTime time.Duration
|
||||
redisClient *RedisClient
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func NewGroupRc(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) GroupCache {
|
||||
rcClient := newClient(rdb, opts)
|
||||
return GroupCache{Client: rcClient, expireTime: GroupExpireTime}
|
||||
func NewGroupCache(rdb redis.UniversalClient, db mysql.GroupModelInterface, opts rockscache.Options) *GroupCache {
|
||||
rcClient := &rockscache.Client{
|
||||
Options: rockscache.Options{},
|
||||
}
|
||||
redisClient := NewRedisClient(rdb)
|
||||
return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient}
|
||||
}
|
||||
|
||||
func (g *GroupCache) getRedisClient() *RedisClient {
|
||||
return g.redisClient
|
||||
}
|
||||
|
||||
func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) {
|
||||
@ -52,7 +60,7 @@ func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string)
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
|
||||
}()
|
||||
groupStr, err := g.Client.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup)
|
||||
groupStr, err := g.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
@ -64,7 +72,16 @@ func (g *GroupCache) DelGroupInfoFromCache(ctx context.Context, groupID string)
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||
}()
|
||||
return g.Client.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
|
||||
return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
|
||||
}
|
||||
|
||||
func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []string) error {
|
||||
for _, groupID := range groupIDs {
|
||||
if err := g.DelGroupInfoFromCache(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *GroupCache) getGroupInfoCacheKey(groupID string) string {
|
||||
|
11
pkg/common/db/cache/redis.go
vendored
11
pkg/common/db/cache/redis.go
vendored
@ -42,9 +42,10 @@ const (
|
||||
exTypeKeyLocker = "EX_LOCK:"
|
||||
)
|
||||
|
||||
func InitRedis(ctx context.Context) go_redis.UniversalClient {
|
||||
func InitRedis() go_redis.UniversalClient {
|
||||
var rdb go_redis.UniversalClient
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
if config.Config.Redis.EnableCluster {
|
||||
rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{
|
||||
Addrs: config.Config.Redis.DBAddress,
|
||||
@ -73,14 +74,14 @@ func InitRedis(ctx context.Context) go_redis.UniversalClient {
|
||||
return rdb
|
||||
}
|
||||
|
||||
func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient {
|
||||
return &RedisClient{rdb: rdb}
|
||||
}
|
||||
|
||||
type RedisClient struct {
|
||||
rdb go_redis.UniversalClient
|
||||
}
|
||||
|
||||
func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient {
|
||||
return &RedisClient{rdb: rdb}
|
||||
}
|
||||
|
||||
func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) {
|
||||
key := accountTempCode + account
|
||||
n, err := r.rdb.Exists(context.Background(), key).Result()
|
||||
|
90
pkg/common/db/cache/rockscache.go
vendored
90
pkg/common/db/cache/rockscache.go
vendored
@ -11,6 +11,7 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"gorm.io/gorm"
|
||||
"math/big"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -18,11 +19,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
userInfoCache = "USER_INFO_CACHE:"
|
||||
friendRelationCache = "FRIEND_RELATION_CACHE:"
|
||||
blackListCache = "BLACK_LIST_CACHE:"
|
||||
groupCache = "GROUP_CACHE:"
|
||||
groupInfoCache = "GROUP_INFO_CACHE:"
|
||||
userInfoCache = "USER_INFO_CACHE:"
|
||||
friendRelationCache = "FRIEND_RELATION_CACHE:"
|
||||
blackListCache = "BLACK_LIST_CACHE:"
|
||||
groupCache = "GROUP_CACHE:"
|
||||
//groupInfoCache = "GROUP_INFO_CACHE:"
|
||||
groupOwnerIDCache = "GROUP_OWNER_ID:"
|
||||
joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
|
||||
groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
|
||||
@ -38,19 +39,10 @@ const (
|
||||
)
|
||||
|
||||
const scanCount = 3000
|
||||
|
||||
type RcClient struct {
|
||||
rdb redis.UniversalClient
|
||||
Cache *rockscache.Client
|
||||
ExpireTime time.Duration
|
||||
}
|
||||
|
||||
func NewRcClient(rdb redis.UniversalClient, expireTime time.Duration, opts rockscache.Options) *RcClient {
|
||||
return &RcClient{Cache: rockscache.NewClient(rdb, opts), ExpireTime: expireTime}
|
||||
}
|
||||
const RandomExpireAdjustment = 0.2
|
||||
|
||||
func (rc *RcClient) DelKeys() {
|
||||
for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache,
|
||||
for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCacheKey, groupOwnerIDCache, joinedGroupListCache,
|
||||
groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} {
|
||||
fName := utils.GetSelfFuncName()
|
||||
var cursor uint64
|
||||
@ -80,7 +72,7 @@ func (rc *RcClient) DelKeys() {
|
||||
}
|
||||
}
|
||||
|
||||
func (rc *RcClient) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) {
|
||||
func (rc *Client) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) {
|
||||
getFriendIDList := func() (string, error) {
|
||||
friendIDList, err := mysql.GetFriendIDListByUserID(userID)
|
||||
if err != nil {
|
||||
@ -364,36 +356,36 @@ func DelAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (err e
|
||||
return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID)
|
||||
}
|
||||
|
||||
func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
|
||||
getGroupInfo := func() (string, error) {
|
||||
groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
bytes, err := json.Marshal(groupInfo)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
groupInfo = &mysql.Group{}
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
|
||||
}()
|
||||
groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
err = json.Unmarshal([]byte(groupInfoStr), groupInfo)
|
||||
return groupInfo, utils.Wrap(err, "")
|
||||
}
|
||||
|
||||
func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||
}()
|
||||
return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID)
|
||||
}
|
||||
//func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
|
||||
// getGroupInfo := func() (string, error) {
|
||||
// groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "")
|
||||
// }
|
||||
// bytes, err := json.Marshal(groupInfo)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "")
|
||||
// }
|
||||
// return string(bytes), nil
|
||||
// }
|
||||
// groupInfo = &mysql.Group{}
|
||||
// defer func() {
|
||||
// trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
|
||||
// }()
|
||||
// groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo)
|
||||
// if err != nil {
|
||||
// return nil, utils.Wrap(err, "")
|
||||
// }
|
||||
// err = json.Unmarshal([]byte(groupInfoStr), groupInfo)
|
||||
// return groupInfo, utils.Wrap(err, "")
|
||||
//}
|
||||
//
|
||||
//func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) {
|
||||
// defer func() {
|
||||
// trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||
// }()
|
||||
// return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID)
|
||||
//}
|
||||
|
||||
func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) {
|
||||
getAllFriendInfo := func() (string, error) {
|
||||
@ -616,7 +608,7 @@ func DelConversationFromCache(ctx context.Context, ownerUserID, conversationID s
|
||||
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
|
||||
}
|
||||
|
||||
func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongo.ExtendMsg, err error) {
|
||||
func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongoDB.ExtendMsg, err error) {
|
||||
getExtendMsg := func() (string, error) {
|
||||
extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
|
||||
if err != nil {
|
||||
@ -636,7 +628,7 @@ func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clien
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "Fetch failed")
|
||||
}
|
||||
extendMsg = &mongo.ExtendMsg{}
|
||||
extendMsg = &mongoDB.ExtendMsg{}
|
||||
err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
|
||||
return extendMsg, utils.Wrap(err, "Unmarshal failed")
|
||||
}
|
||||
|
@ -2,42 +2,42 @@ package model
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/db/cache"
|
||||
"Open_IM/pkg/common/db/mongo"
|
||||
"Open_IM/pkg/common/db/mongoDB"
|
||||
"Open_IM/pkg/common/db/mysql"
|
||||
"Open_IM/pkg/common/trace_log"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"encoding/json"
|
||||
//"github.com/dtm-labs/rockscache"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"gorm.io/gorm"
|
||||
//"time"
|
||||
)
|
||||
|
||||
type GroupModel struct {
|
||||
db *mysql.Group
|
||||
cache *cache.GroupCache
|
||||
mongo *mongo.Client
|
||||
type GroupInterface interface {
|
||||
Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error)
|
||||
Create(ctx context.Context, groups []*mysql.Group) error
|
||||
Delete(ctx context.Context, groupIDs []string) error
|
||||
Take(ctx context.Context, groupID string) (group *mysql.Group, err error)
|
||||
}
|
||||
|
||||
type GroupModel struct {
|
||||
db mysql.GroupModelInterface
|
||||
cache *cache.GroupCache
|
||||
mongo *mongoDB.Client
|
||||
}
|
||||
|
||||
func NewGroupModel() {
|
||||
func NewGroupModel(db mysql.GroupModelInterface, rdb redis.UniversalClient, mdb *mongo.Client) *GroupModel {
|
||||
var groupModel GroupModel
|
||||
redisClient := cache.InitRedis()
|
||||
rdb := cache.NewRedisClient(redisClient)
|
||||
groupModel.db = mysql.NewGroupDB()
|
||||
//mgo := mongo.In()
|
||||
groupModel.db = db
|
||||
groupModel.cache = cache.NewGroupCache(rdb, db, rockscache.Options{
|
||||
DisableCacheRead: false,
|
||||
StrongConsistency: true,
|
||||
})
|
||||
groupModel.mongo = mongoDB.NewMongoClient(mdb)
|
||||
return &groupModel
|
||||
}
|
||||
|
||||
func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) {
|
||||
g.cache.Client.
|
||||
for _, groupID := range groupIDs {
|
||||
group, err := g.getGroupInfoFromCache(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groups = append(groups, group)
|
||||
}
|
||||
return groups, nil
|
||||
return g.cache.GetGroupsInfoFromCache(ctx, groupIDs)
|
||||
}
|
||||
|
||||
func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error {
|
||||
@ -49,7 +49,7 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error {
|
||||
if err := g.db.Delete(ctx, groupIDs, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.deleteGroupsInCache(ctx, groupIDs); err != nil {
|
||||
if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -57,35 +57,6 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *GroupModel) deleteGroupsInCache(ctx context.Context, groupIDs []string) error {
|
||||
for _, groupID := range groupIDs {
|
||||
if err := g.weakRc.Cache.TagAsDeleted(g.getGroupCacheKey(groupID)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *GroupModel) getGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
|
||||
getGroupInfo := func() (string, error) {
|
||||
groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
bytes, err := json.Marshal(groupInfo)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
groupInfo = &mysql.Group{}
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
|
||||
}()
|
||||
groupInfoStr, err := g.weakRc.Cache.Fetch(groupInfoCache+groupID, GroupExpireTime, getGroupInfo)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
err = json.Unmarshal([]byte(groupInfoStr), groupInfo)
|
||||
return groupInfo, utils.Wrap(err, "")
|
||||
func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) {
|
||||
return g.cache.GetGroupInfoFromCache(ctx, groupID)
|
||||
}
|
||||
|
@ -9,9 +9,10 @@ type UserModel struct {
|
||||
db *mysql.User
|
||||
}
|
||||
|
||||
func NewGroupUser(ctx context.Context) {
|
||||
func NewGroupUser(ctx context.Context) *UserModel {
|
||||
var userModel UserModel
|
||||
userModel.db = mysql.NewUserDB()
|
||||
return &userModel
|
||||
}
|
||||
|
||||
func (u *UserModel) Find(ctx context.Context, userIDs []string) (users []*mysql.User, err error) {
|
||||
|
@ -1 +0,0 @@
|
||||
package mongo
|
@ -1,4 +1,4 @@
|
||||
package mongo
|
||||
package mongoDB
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
@ -1,4 +1,4 @@
|
||||
package mongo
|
||||
package mongoDB
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
@ -1,4 +1,4 @@
|
||||
package mongo
|
||||
package mongoDB
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
@ -13,7 +13,15 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func InitMongoClient() *mongo.Client {
|
||||
type Client struct {
|
||||
mongo *mongo.Client
|
||||
}
|
||||
|
||||
func NewMongoClient(mdb *mongo.Client) *Client {
|
||||
return &Client{mongo: mdb}
|
||||
}
|
||||
|
||||
func initMongo() *mongo.Client {
|
||||
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
|
||||
if config.Config.Mongo.DBUri != "" {
|
||||
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
|
||||
@ -50,6 +58,14 @@ func InitMongoClient() *mongo.Client {
|
||||
panic(err.Error() + " mongo.Connect failed " + uri)
|
||||
}
|
||||
}
|
||||
return mongoClient
|
||||
}
|
||||
|
||||
func GetCollection(mongoClient *mongo.Client) {
|
||||
|
||||
}
|
||||
|
||||
func CreateAllIndex(mongoClient *mongo.Client) {
|
||||
// mongodb create index
|
||||
if err := createMongoIndex(mongoClient, cSendLog, false, "send_id", "-send_time"); err != nil {
|
||||
panic(err.Error() + " index create failed " + cSendLog + " send_id, -send_time")
|
||||
@ -72,7 +88,6 @@ func InitMongoClient() *mongo.Client {
|
||||
if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil {
|
||||
panic(err.Error() + "index create failed " + cTag + " tag_id")
|
||||
}
|
||||
return mongoClient
|
||||
}
|
||||
|
||||
func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error {
|
@ -1,4 +1,4 @@
|
||||
package mongo
|
||||
package mongoDB
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
@ -1091,8 +1091,7 @@ func (d *db.DataBases) GetUserFriendWorkMoments(showNumber, pageNumber int32, us
|
||||
}
|
||||
|
||||
type SuperGroup struct {
|
||||
GroupID string `bson:"group_id" json:"groupID"`
|
||||
//MemberNumCount int `bson:"member_num_count"`
|
||||
GroupID string `bson:"group_id" json:"groupID"`
|
||||
MemberIDList []string `bson:"member_id_list" json:"memberIDList"`
|
||||
}
|
||||
|
1
pkg/common/db/mongoDB/office.go
Normal file
1
pkg/common/db/mongoDB/office.go
Normal file
@ -0,0 +1 @@
|
||||
package mongoDB
|
11
pkg/common/db/mongoDB/super_group.go
Normal file
11
pkg/common/db/mongoDB/super_group.go
Normal file
@ -0,0 +1,11 @@
|
||||
package mongoDB
|
||||
|
||||
type SuperGroup struct {
|
||||
GroupID string `bson:"group_id" json:"groupID"`
|
||||
MemberIDList []string `bson:"member_id_list" json:"memberIDList"`
|
||||
}
|
||||
|
||||
type UserToSuperGroup struct {
|
||||
UserID string `bson:"user_id" json:"userID"`
|
||||
GroupIDList []string `bson:"group_id_list" json:"groupIDList"`
|
||||
}
|
@ -8,6 +8,14 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type GroupModelInterface interface {
|
||||
Create(ctx context.Context, groups []*Group) (err error)
|
||||
Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error)
|
||||
UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error)
|
||||
Update(ctx context.Context, groups []*Group) (err error)
|
||||
Find(ctx context.Context, groupIDs []string) (groups []*Group, err error)
|
||||
Take(ctx context.Context, groupID string) (group *Group, err error)
|
||||
}
|
||||
type Group struct {
|
||||
GroupID string `gorm:"column:group_id;primary_key;size:64" json:"groupID" binding:"required"`
|
||||
GroupName string `gorm:"column:name;size:255" json:"groupName"`
|
||||
@ -29,7 +37,8 @@ type Group struct {
|
||||
|
||||
func NewGroupDB() *Group {
|
||||
var group Group
|
||||
group.DB = initMysqlDB(&group)
|
||||
db := ConnectToDB()
|
||||
db = InitModel(db, &group)
|
||||
return &group
|
||||
}
|
||||
|
||||
@ -41,11 +50,11 @@ func (*Group) Create(ctx context.Context, groups []*Group) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (*Group) Delete(ctx context.Context, groupIDs []string) (err error) {
|
||||
func (g *Group) Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs)
|
||||
}()
|
||||
return utils.Wrap(GroupDB.Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "")
|
||||
return utils.Wrap(getDBConn(g.DB, tx...).Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "")
|
||||
}
|
||||
|
||||
func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) {
|
||||
|
@ -122,7 +122,7 @@ func RemoveDuplicateElement(idList []string) []string {
|
||||
return result
|
||||
}
|
||||
|
||||
func IsRepeatStringSlice(arr []string) bool {
|
||||
func IsDuplicateStringSlice(arr []string) bool {
|
||||
t := make(map[string]struct{})
|
||||
for _, s := range arr {
|
||||
if _, ok := t[s]; ok {
|
||||
@ -133,6 +133,6 @@ func IsRepeatStringSlice(arr []string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func IsRepeatID(args ...interface{}) bool {
|
||||
func IsDuplicateID(args ...interface{}) bool {
|
||||
return false
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user