mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-23 09:50:27 +08:00
errcode
This commit is contained in:
parent
87e61046cb
commit
68d595b558
43
pkg/common/db/cache/group.go
vendored
43
pkg/common/db/cache/group.go
vendored
@ -15,23 +15,25 @@ const GroupExpireTime = time.Second * 60 * 60 * 12
|
|||||||
const groupInfoCacheKey = "GROUP_INFO_CACHE:"
|
const groupInfoCacheKey = "GROUP_INFO_CACHE:"
|
||||||
|
|
||||||
type GroupCache struct {
|
type GroupCache struct {
|
||||||
db *relation.Group
|
group *relation.Group
|
||||||
expireTime time.Duration
|
groupMember *relation.GroupMember
|
||||||
redisClient *RedisClient
|
groupRequest *relation.GroupRequest
|
||||||
rcClient *rockscache.Client
|
expireTime time.Duration
|
||||||
|
redisClient *RedisClient
|
||||||
|
rcClient *rockscache.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGroupCache(rdb redis.UniversalClient, db *relation.Group, opts rockscache.Options) *GroupCache {
|
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, opts rockscache.Options) *GroupCache {
|
||||||
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, db: db, redisClient: NewRedisClient(rdb)}
|
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCache) getRedisClient() *RedisClient {
|
func (g *GroupCache) getRedisClient() *RedisClient {
|
||||||
return g.redisClient
|
return g.redisClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
||||||
for _, groupID := range groupIDs {
|
for _, groupID := range groupIDs {
|
||||||
group, err := g.GetGroupInfoFromCache(ctx, groupID)
|
group, err := g.GetGroupInfo(ctx, groupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -40,9 +42,9 @@ func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []stri
|
|||||||
return groups, nil
|
return groups, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
||||||
getGroup := func() (string, error) {
|
getGroup := func() (string, error) {
|
||||||
groupInfo, err := g.db.Take(ctx, groupID)
|
groupInfo, err := g.group.Take(ctx, groupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", utils.Wrap(err, "")
|
return "", utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
@ -64,16 +66,16 @@ func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string)
|
|||||||
return group, utils.Wrap(err, "")
|
return group, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCache) DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) {
|
func (g *GroupCache) DelGroupInfo(ctx context.Context, groupID string) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||||
}()
|
}()
|
||||||
return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
|
return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []string) error {
|
func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
|
||||||
for _, groupID := range groupIDs {
|
for _, groupID := range groupIDs {
|
||||||
if err := g.DelGroupInfoFromCache(ctx, groupID); err != nil {
|
if err := g.DelGroupInfo(ctx, groupID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -83,3 +85,18 @@ func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []stri
|
|||||||
func (g *GroupCache) getGroupInfoCacheKey(groupID string) string {
|
func (g *GroupCache) getGroupInfoCacheKey(groupID string) string {
|
||||||
return groupInfoCacheKey + groupID
|
return groupInfoCacheKey + groupID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
if err := g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GroupCache) DelJoinedSuperGroupID(ctx context.Context, userID string) (err error) {
|
||||||
|
defer func() {
|
||||||
|
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
||||||
|
}()
|
||||||
|
return g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID)
|
||||||
|
}
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"Open_IM/pkg/common/db/unrelation"
|
"Open_IM/pkg/common/db/unrelation"
|
||||||
"context"
|
"context"
|
||||||
"github.com/dtm-labs/rockscache"
|
"github.com/dtm-labs/rockscache"
|
||||||
_ "github.com/dtm-labs/rockscache"
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
@ -66,16 +65,23 @@ type DataBase interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type GroupDataBase struct {
|
type GroupDataBase struct {
|
||||||
sqlDB *relation.Group
|
groupDB *relation.Group
|
||||||
|
groupMemberDB *relation.GroupMember
|
||||||
|
groupRequestDB *relation.GroupRequest
|
||||||
|
|
||||||
cache *cache.GroupCache
|
cache *cache.GroupCache
|
||||||
mongoDB *unrelation.SuperGroupMgoDB
|
mongoDB *unrelation.SuperGroupMgoDB
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Database) DataBase {
|
func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Database) DataBase {
|
||||||
sqlDB := relation.NewGroupDB(db)
|
groupDB := relation.NewGroupDB(db)
|
||||||
|
groupMemberDB := relation.NewGroupMemberDB(db)
|
||||||
|
groupRequestDB := relation.NewGroupRequest(db)
|
||||||
database := &GroupDataBase{
|
database := &GroupDataBase{
|
||||||
sqlDB: sqlDB,
|
groupDB: groupDB,
|
||||||
cache: cache.NewGroupCache(rdb, sqlDB, rockscache.Options{
|
groupMemberDB: groupMemberDB,
|
||||||
|
groupRequestDB: groupRequestDB,
|
||||||
|
cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, rockscache.Options{
|
||||||
RandomExpireAdjustment: 0.2,
|
RandomExpireAdjustment: 0.2,
|
||||||
DisableCacheRead: false,
|
DisableCacheRead: false,
|
||||||
DisableCacheDelete: false,
|
DisableCacheDelete: false,
|
||||||
@ -87,19 +93,19 @@ func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Datab
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) Find(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
func (g *GroupDataBase) Find(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
||||||
return g.cache.GetGroupsInfoFromCache(ctx, groupIDs)
|
return g.cache.GetGroupsInfo(ctx, groupIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) Create(ctx context.Context, groups []*relation.Group) error {
|
func (g *GroupDataBase) Create(ctx context.Context, groups []*relation.Group) error {
|
||||||
return g.sqlDB.Create(ctx, groups)
|
return g.groupDB.Create(ctx, groups)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) Delete(ctx context.Context, groupIDs []string) error {
|
func (g *GroupDataBase) Delete(ctx context.Context, groupIDs []string) error {
|
||||||
return g.sqlDB.DB.Transaction(func(tx *gorm.DB) error {
|
return g.groupDB.DB.Transaction(func(tx *gorm.DB) error {
|
||||||
if err := g.sqlDB.Delete(ctx, groupIDs, tx); err != nil {
|
if err := g.groupDB.Delete(ctx, groupIDs, tx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil {
|
if err := g.cache.DelGroupsInfo(ctx, groupIDs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -107,19 +113,19 @@ func (g *GroupDataBase) Delete(ctx context.Context, groupIDs []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) Take(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
func (g *GroupDataBase) Take(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
||||||
return g.cache.GetGroupInfoFromCache(ctx, groupID)
|
return g.cache.GetGroupInfo(ctx, groupID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) Update(ctx context.Context, groups []*relation.Group) error {
|
func (g *GroupDataBase) Update(ctx context.Context, groups []*relation.Group) error {
|
||||||
return g.sqlDB.DB.Transaction(func(tx *gorm.DB) error {
|
return g.groupDB.DB.Transaction(func(tx *gorm.DB) error {
|
||||||
if err := g.sqlDB.Update(ctx, groups, tx); err != nil {
|
if err := g.groupDB.Update(ctx, groups, tx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var groupIDs []string
|
var groupIDs []string
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
groupIDs = append(groupIDs, group.GroupID)
|
groupIDs = append(groupIDs, group.GroupID)
|
||||||
}
|
}
|
||||||
if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil {
|
if err := g.cache.DelGroupsInfo(ctx, groupIDs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -127,7 +133,7 @@ func (g *GroupDataBase) Update(ctx context.Context, groups []*relation.Group) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
|
func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
|
||||||
g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDList, memberNumCount)
|
return g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDList, memberNumCount, g.cache.DelJoinedSuperGroupIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) GetSuperGroup(ctx context.Context, groupID string) (superGroup *unrelation.SuperGroup, err error) {
|
func (g *GroupDataBase) GetSuperGroup(ctx context.Context, groupID string) (superGroup *unrelation.SuperGroup, err error) {
|
||||||
|
@ -27,6 +27,12 @@ type GroupMember struct {
|
|||||||
DB *gorm.DB
|
DB *gorm.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewGroupMemberDB(db *gorm.DB) *GroupMember {
|
||||||
|
return &GroupMember{
|
||||||
|
DB: db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GroupMember) Create(ctx context.Context, groupMemberList []*GroupMember) (err error) {
|
func (g *GroupMember) Create(ctx context.Context, groupMemberList []*GroupMember) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupMemberList", groupMemberList)
|
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupMemberList", groupMemberList)
|
||||||
|
@ -28,9 +28,7 @@ type Group struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewGroupDB(db *gorm.DB) *Group {
|
func NewGroupDB(db *gorm.DB) *Group {
|
||||||
var group Group
|
return &Group{DB: db}
|
||||||
group.DB = db.Model(&Group{})
|
|
||||||
return &group
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Group) Create(ctx context.Context, groups []*Group) (err error) {
|
func (g *Group) Create(ctx context.Context, groups []*Group) (err error) {
|
||||||
|
@ -22,6 +22,13 @@ type GroupRequest struct {
|
|||||||
JoinSource int32 `gorm:"column:join_source"`
|
JoinSource int32 `gorm:"column:join_source"`
|
||||||
InviterUserID string `gorm:"column:inviter_user_id;size:64"`
|
InviterUserID string `gorm:"column:inviter_user_id;size:64"`
|
||||||
Ex string `gorm:"column:ex;size:1024"`
|
Ex string `gorm:"column:ex;size:1024"`
|
||||||
|
DB *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGroupRequest(db *gorm.DB) *GroupRequest {
|
||||||
|
return &GroupRequest{
|
||||||
|
DB: db,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (GroupRequest) TableName() string {
|
func (GroupRequest) TableName() string {
|
||||||
|
@ -34,7 +34,7 @@ func NewSuperGroupMgoDB(mgoDB *mongo.Database) *SuperGroupMgoDB {
|
|||||||
return &SuperGroupMgoDB{mgoDB: mgoDB, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)}
|
return &SuperGroupMgoDB{mgoDB: mgoDB, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *SuperGroupMgoDB) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error {
|
func (db *SuperGroupMgoDB) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int, cacheFunc func(ctx context.Context, userIDs []string) error) error {
|
||||||
//ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
//ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||||
//c := db.mgoDB.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
//c := db.mgoDB.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||||
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
|
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
|
||||||
@ -63,6 +63,12 @@ func (db *SuperGroupMgoDB) CreateSuperGroup(ctx context.Context, groupID string,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if cacheFunc != nil {
|
||||||
|
if err = cacheFunc(ctx, initMemberIDList); err != nil {
|
||||||
|
_ = sCtx.AbortTransaction(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return sCtx.CommitTransaction(ctx)
|
return sCtx.CommitTransaction(ctx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user