mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-01 04:29:20 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
6e26c0d986
@ -41,7 +41,7 @@ func (db *DBFriend) convert() (*sdk.FriendInfo, error) {
|
||||
return pbFriend, nil
|
||||
}
|
||||
|
||||
func (pb *PBFriend) convert() (*imdb.Friend, error) {
|
||||
func (pb *PBFriend) Convert() (*imdb.Friend, error) {
|
||||
dbFriend := &imdb.Friend{}
|
||||
utils2.CopyStructFields(dbFriend, pb)
|
||||
dbFriend.FriendUserID = pb.FriendUser.UserID
|
||||
@ -57,14 +57,14 @@ type PBFriendRequest struct {
|
||||
*sdk.FriendRequest
|
||||
}
|
||||
|
||||
func (pb *PBFriendRequest) convert() (*imdb.FriendRequest, error) {
|
||||
func (pb *PBFriendRequest) Convert() (*imdb.FriendRequest, error) {
|
||||
dbFriendRequest := &imdb.FriendRequest{}
|
||||
utils.CopyStructFields(dbFriendRequest, pb)
|
||||
dbFriendRequest.CreateTime = utils.UnixSecondToTime(int64(pb.CreateTime))
|
||||
dbFriendRequest.HandleTime = utils.UnixSecondToTime(int64(pb.HandleTime))
|
||||
return dbFriendRequest, nil
|
||||
}
|
||||
func (db *DBFriendRequest) convert() (*sdk.FriendRequest, error) {
|
||||
func (db *DBFriendRequest) Convert() (*sdk.FriendRequest, error) {
|
||||
pbFriendRequest := &sdk.FriendRequest{}
|
||||
utils.CopyStructFields(pbFriendRequest, db)
|
||||
user, err := getUsersInfo([]string{db.FromUserID})
|
||||
@ -94,13 +94,13 @@ type PBBlack struct {
|
||||
*sdk.BlackInfo
|
||||
}
|
||||
|
||||
func (pb *PBBlack) convert() (*imdb.Black, error) {
|
||||
func (pb *PBBlack) Convert() (*imdb.Black, error) {
|
||||
dbBlack := &imdb.Black{}
|
||||
dbBlack.BlockUserID = pb.BlackUserInfo.UserID
|
||||
dbBlack.CreateTime = utils.UnixSecondToTime(int64(pb.CreateTime))
|
||||
return dbBlack, nil
|
||||
}
|
||||
func (db *DBBlack) convert() (*sdk.BlackInfo, error) {
|
||||
func (db *DBBlack) Convert() (*sdk.BlackInfo, error) {
|
||||
pbBlack := &sdk.BlackInfo{}
|
||||
utils.CopyStructFields(pbBlack, db)
|
||||
pbBlack.CreateTime = uint32(db.CreateTime.Unix())
|
||||
@ -125,7 +125,7 @@ func (pb *PBGroup) Convert() (*imdb.Group, error) {
|
||||
utils.CopyStructFields(dst, pb)
|
||||
return dst, nil
|
||||
}
|
||||
func (db *DBGroup) convert() (*sdk.GroupInfo, error) {
|
||||
func (db *DBGroup) Convert() (*sdk.GroupInfo, error) {
|
||||
dst := &sdk.GroupInfo{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
user, err := getGroupOwnerInfo(db.GroupID)
|
||||
@ -155,14 +155,14 @@ type PBGroupMember struct {
|
||||
*sdk.GroupMemberFullInfo
|
||||
}
|
||||
|
||||
func (pb *PBGroupMember) convert() (*imdb.GroupMember, error) {
|
||||
func (pb *PBGroupMember) Convert() (*imdb.GroupMember, error) {
|
||||
dst := &imdb.GroupMember{}
|
||||
utils.CopyStructFields(dst, pb)
|
||||
dst.JoinTime = utils.UnixSecondToTime(int64(pb.JoinTime))
|
||||
dst.MuteEndTime = utils.UnixSecondToTime(int64(pb.MuteEndTime))
|
||||
return dst, nil
|
||||
}
|
||||
func (db *DBGroupMember) convert() (*sdk.GroupMemberFullInfo, error) {
|
||||
func (db *DBGroupMember) Convert() (*sdk.GroupMemberFullInfo, error) {
|
||||
dst := &sdk.GroupMemberFullInfo{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
|
||||
@ -191,14 +191,14 @@ type PBGroupRequest struct {
|
||||
*sdk.GroupRequest
|
||||
}
|
||||
|
||||
func (pb *PBGroupRequest) convert() (*imdb.GroupRequest, error) {
|
||||
func (pb *PBGroupRequest) Convert() (*imdb.GroupRequest, error) {
|
||||
dst := &imdb.GroupRequest{}
|
||||
utils.CopyStructFields(dst, pb)
|
||||
dst.ReqTime = utils.UnixSecondToTime(int64(pb.ReqTime))
|
||||
dst.HandledTime = utils.UnixSecondToTime(int64(pb.HandleTime))
|
||||
return dst, nil
|
||||
}
|
||||
func (db *DBGroupRequest) convert() (*sdk.GroupRequest, error) {
|
||||
func (db *DBGroupRequest) Convert() (*sdk.GroupRequest, error) {
|
||||
dst := &sdk.GroupRequest{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
dst.ReqTime = uint32(db.ReqTime.Unix())
|
||||
@ -214,7 +214,7 @@ type PBUser struct {
|
||||
*sdk.UserInfo
|
||||
}
|
||||
|
||||
func (pb *PBUser) convert() (*imdb.User, error) {
|
||||
func (pb *PBUser) Convert() (*imdb.User, error) {
|
||||
dst := &imdb.User{}
|
||||
utils.CopyStructFields(dst, pb)
|
||||
dst.Birth = utils.UnixSecondToTime(pb.Birthday)
|
||||
@ -222,7 +222,7 @@ func (pb *PBUser) convert() (*imdb.User, error) {
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
func (db *DBUser) convert() (*sdk.UserInfo, error) {
|
||||
func (db *DBUser) Convert() (*sdk.UserInfo, error) {
|
||||
dst := &sdk.UserInfo{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
dst.CreateTime = uint32(db.CreateTime.Unix())
|
||||
@ -230,7 +230,7 @@ func (db *DBUser) convert() (*sdk.UserInfo, error) {
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
func (db *DBUser) convertPublic() (*sdk.PublicUserInfo, error) {
|
||||
func (db *DBUser) ConvertPublic() (*sdk.PublicUserInfo, error) {
|
||||
dst := &sdk.PublicUserInfo{}
|
||||
utils.CopyStructFields(dst, db)
|
||||
return dst, nil
|
||||
|
6
pkg/common/db/cache/group.go
vendored
6
pkg/common/db/cache/group.go
vendored
@ -22,11 +22,7 @@ type GroupCache struct {
|
||||
}
|
||||
|
||||
func NewGroupCache(rdb redis.UniversalClient, db mysql.GroupModelInterface, opts rockscache.Options) *GroupCache {
|
||||
rcClient := &rockscache.Client{
|
||||
Options: rockscache.Options{},
|
||||
}
|
||||
redisClient := NewRedisClient(rdb)
|
||||
return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient}
|
||||
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, db: db, redisClient: NewRedisClient(rdb)}
|
||||
}
|
||||
|
||||
func (g *GroupCache) getRedisClient() *RedisClient {
|
||||
|
@ -32,6 +32,8 @@ func NewGroupModel(db mysql.GroupModelInterface, rdb redis.UniversalClient, mdb
|
||||
DisableCacheRead: false,
|
||||
StrongConsistency: true,
|
||||
})
|
||||
sg := mdb.Database().Collection()
|
||||
sg.Find()
|
||||
groupModel.mongo = mongoDB.NewMongoClient(mdb)
|
||||
return &groupModel
|
||||
}
|
||||
@ -60,3 +62,20 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error {
|
||||
func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) {
|
||||
return g.cache.GetGroupInfoFromCache(ctx, groupID)
|
||||
}
|
||||
|
||||
func (g *GroupModel) Update(ctx context.Context, groups []*mysql.Group) error {
|
||||
err := g.db.DB.Transaction(func(tx *gorm.DB) error {
|
||||
if err := g.db.Update(ctx, groups, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
var groupIDs []string
|
||||
for _, group := range groups {
|
||||
groupIDs = append(groupIDs, group.GroupID)
|
||||
}
|
||||
if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
@ -1,5 +1,11 @@
|
||||
package mongoDB
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/utils"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
type SuperGroup struct {
|
||||
GroupID string `bson:"group_id" json:"groupID"`
|
||||
MemberIDList []string `bson:"member_id_list" json:"memberIDList"`
|
||||
@ -9,3 +15,167 @@ type UserToSuperGroup struct {
|
||||
UserID string `bson:"user_id" json:"userID"`
|
||||
GroupIDList []string `bson:"group_id_list" json:"groupIDList"`
|
||||
}
|
||||
|
||||
func New
|
||||
|
||||
func (d *db.DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error {
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||
session, err := d.mongoClient.StartSession()
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "start session failed")
|
||||
}
|
||||
defer session.EndSession(ctx)
|
||||
sCtx := mongo.NewSessionContext(ctx, session)
|
||||
superGroup := SuperGroup{
|
||||
GroupID: groupID,
|
||||
MemberIDList: initMemberIDList,
|
||||
}
|
||||
_, err = c.InsertOne(sCtx, superGroup)
|
||||
if err != nil {
|
||||
_ = session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
var users []UserToSuperGroup
|
||||
for _, v := range initMemberIDList {
|
||||
users = append(users, UserToSuperGroup{
|
||||
UserID: v,
|
||||
})
|
||||
}
|
||||
upsert := true
|
||||
opts := &options.UpdateOptions{
|
||||
Upsert: &upsert,
|
||||
}
|
||||
c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
|
||||
//_, err = c.UpdateMany(sCtx, bson.M{"user_id": bson.M{"$in": initMemberIDList}}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
//if err != nil {
|
||||
// session.AbortTransaction(ctx)
|
||||
// return utils.Wrap(err, "transaction failed")
|
||||
//}
|
||||
for _, userID := range initMemberIDList {
|
||||
_, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
if err != nil {
|
||||
_ = session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *db.DataBases) GetSuperGroup(groupID string) (SuperGroup, error) {
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||
superGroup := SuperGroup{}
|
||||
err := c.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&superGroup)
|
||||
return superGroup, err
|
||||
}
|
||||
|
||||
func (d *db.DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error {
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||
session, err := d.mongoClient.StartSession()
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "start session failed")
|
||||
}
|
||||
defer session.EndSession(ctx)
|
||||
sCtx := mongo.NewSessionContext(ctx, session)
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "start transaction failed")
|
||||
}
|
||||
_, err = c.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}})
|
||||
if err != nil {
|
||||
_ = session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
|
||||
var users []UserToSuperGroup
|
||||
for _, v := range userIDList {
|
||||
users = append(users, UserToSuperGroup{
|
||||
UserID: v,
|
||||
})
|
||||
}
|
||||
upsert := true
|
||||
opts := &options.UpdateOptions{
|
||||
Upsert: &upsert,
|
||||
}
|
||||
for _, userID := range userIDList {
|
||||
_, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
|
||||
if err != nil {
|
||||
_ = session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
}
|
||||
_ = session.CommitTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *db.DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error {
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||
session, err := d.mongoClient.StartSession()
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "start session failed")
|
||||
}
|
||||
defer session.EndSession(ctx)
|
||||
sCtx := mongo.NewSessionContext(ctx, session)
|
||||
_, err = c.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}})
|
||||
if err != nil {
|
||||
_ = session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
err = d.RemoveGroupFromUser(ctx, sCtx, groupID, userIDList)
|
||||
if err != nil {
|
||||
_ = session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
_ = session.CommitTransaction(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *db.DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) {
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
|
||||
var user UserToSuperGroup
|
||||
_ = c.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user)
|
||||
return user, nil
|
||||
}
|
||||
|
||||
func (d *db.DataBases) DeleteSuperGroup(groupID string) error {
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
|
||||
session, err := d.mongoClient.StartSession()
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "start session failed")
|
||||
}
|
||||
defer session.EndSession(ctx)
|
||||
sCtx := mongo.NewSessionContext(ctx, session)
|
||||
superGroup := &SuperGroup{}
|
||||
result := c.FindOneAndDelete(sCtx, bson.M{"group_id": groupID})
|
||||
err = result.Decode(superGroup)
|
||||
if err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
if err = d.RemoveGroupFromUser(ctx, sCtx, groupID, superGroup.MemberIDList); err != nil {
|
||||
session.AbortTransaction(ctx)
|
||||
return utils.Wrap(err, "transaction failed")
|
||||
}
|
||||
session.CommitTransaction(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *db.DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error {
|
||||
var users []UserToSuperGroup
|
||||
for _, v := range userIDList {
|
||||
users = append(users, UserToSuperGroup{
|
||||
UserID: v,
|
||||
})
|
||||
}
|
||||
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
|
||||
_, err := c.UpdateOne(sCtx, bson.M{"user_id": bson.M{"$in": userIDList}}, bson.M{"$pull": bson.M{"group_id_list": groupID}})
|
||||
if err != nil {
|
||||
return utils.Wrap(err, "UpdateOne transaction failed")
|
||||
}
|
||||
return err
|
||||
}
|
@ -20,6 +20,7 @@ type GroupModelInterface interface {
|
||||
|
||||
//mongo
|
||||
}
|
||||
|
||||
type Group struct {
|
||||
GroupID string `gorm:"column:group_id;primary_key;size:64" json:"groupID" binding:"required"`
|
||||
GroupName string `gorm:"column:name;size:255" json:"groupName"`
|
||||
@ -68,11 +69,11 @@ func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]i
|
||||
return utils.Wrap(GroupDB.Where("group_id = ?", groupID).Updates(args).Error, "")
|
||||
}
|
||||
|
||||
func (*Group) Update(ctx context.Context, groups []*Group) (err error) {
|
||||
func (g *Group) Update(ctx context.Context, groups []*Group, tx ...*gorm.DB) (err error) {
|
||||
defer func() {
|
||||
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups)
|
||||
}()
|
||||
return utils.Wrap(GroupDB.Updates(&groups).Error, "")
|
||||
return utils.Wrap(getDBConn(g.DB, tx...).Updates(&groups).Error, "")
|
||||
}
|
||||
|
||||
func (*Group) Find(ctx context.Context, groupIDs []string) (groups []*Group, err error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user