mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-04 19:32:17 +08:00
optimize method structures.
This commit is contained in:
parent
3424fc07ad
commit
2e433fb9f3
@ -147,81 +147,6 @@ func (o *GroupApi) GetIncrementalGroupMember(c *gin.Context) {
|
|||||||
|
|
||||||
func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) {
|
func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) {
|
||||||
a2r.Call(group.GroupClient.BatchGetIncrementalGroupMember, o.Client, c)
|
a2r.Call(group.GroupClient.BatchGetIncrementalGroupMember, o.Client, c)
|
||||||
|
|
||||||
// // OLd. Need Deprecated
|
|
||||||
// // type BatchIncrementalReq struct {
|
|
||||||
// // UserID string `json:"user_id"`
|
|
||||||
// // List []*group.GetIncrementalGroupMemberReq `json:"list"`
|
|
||||||
// // }
|
|
||||||
// // type BatchIncrementalResp struct {
|
|
||||||
// // List map[string]*group.GetIncrementalGroupMemberResp `json:"list"`
|
|
||||||
// // }
|
|
||||||
// // req, err := a2r.ParseRequestNotCheck[BatchIncrementalReq](c)
|
|
||||||
// // if err != nil {
|
|
||||||
// // apiresp.GinError(c, err)
|
|
||||||
// // return
|
|
||||||
// // }
|
|
||||||
// // resp := &BatchIncrementalResp{
|
|
||||||
// // List: make(map[string]*group.GetIncrementalGroupMemberResp),
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// //**** Start ****//
|
|
||||||
// var (
|
|
||||||
// changeCount int
|
|
||||||
// )
|
|
||||||
// req, err := a2r.ParseRequestNotCheck[group.BatchGetIncrementalGroupMemberReq](c)
|
|
||||||
// if err != nil {
|
|
||||||
// apiresp.GinError(c, err)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// resp, err := o.Client.BatchGetIncrementalGroupMember(c, req)
|
|
||||||
// if err != nil {
|
|
||||||
// if len(resp.RespList) == 0 {
|
|
||||||
// apiresp.GinError(c, err)
|
|
||||||
// } else {
|
|
||||||
// log.ZError(c, "group incr sync version", err)
|
|
||||||
// apiresp.GinSuccess(c, resp)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// res := make(map[string]*group.GetIncrementalGroupMemberResp)
|
|
||||||
// for k, val := range resp.RespList {
|
|
||||||
// changeCount += len(val.Insert) + len(val.Delete) + len(val.Update)
|
|
||||||
// if changeCount >= 200 {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// res[k] = val
|
|
||||||
// }
|
|
||||||
// resp = &group.BatchGetIncrementalGroupMemberResp{
|
|
||||||
// RespList: res,
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // OLd. Need Deprecated
|
|
||||||
// // for _, req := range req.List {
|
|
||||||
// // if _, ok := resp.List[req.GroupID]; ok {
|
|
||||||
// // continue
|
|
||||||
// // }
|
|
||||||
// // res, err := o.Client.GetIncrementalGroupMember(c, req)
|
|
||||||
// // if err != nil {
|
|
||||||
// // if len(resp.List) == 0 {
|
|
||||||
// // apiresp.GinError(c, err)
|
|
||||||
// // } else {
|
|
||||||
// // log.ZError(c, "group incr sync version", err, "groupID", req.GroupID, "success", len(resp.List))
|
|
||||||
// // apiresp.GinSuccess(c, resp)
|
|
||||||
// // }
|
|
||||||
// // return
|
|
||||||
// // }
|
|
||||||
// // resp.List[req.GroupID] = res
|
|
||||||
// // changeCount += len(res.Insert) + len(res.Delete) + len(res.Update)
|
|
||||||
// // if changeCount >= 200 {
|
|
||||||
// // break
|
|
||||||
// // }
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// apiresp.GinSuccess(c, resp)
|
|
||||||
|
|
||||||
//**** End ****//
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *GroupApi) GetFullGroupMemberUserIDs(c *gin.Context) {
|
func (o *GroupApi) GetFullGroupMemberUserIDs(c *gin.Context) {
|
||||||
|
|||||||
@ -130,8 +130,10 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p
|
|||||||
}
|
}
|
||||||
|
|
||||||
var groupIDs []string
|
var groupIDs []string
|
||||||
groupVersionMap := make(map[string]*VersionInfo)
|
|
||||||
|
groupsVersionMap := make(map[string]*VersionInfo)
|
||||||
groupsMap := make(map[string]*model.Group)
|
groupsMap := make(map[string]*model.Group)
|
||||||
|
hasGroupUpdateMap := make(map[string]bool)
|
||||||
|
|
||||||
var targetKeys, versionIDs []string
|
var targetKeys, versionIDs []string
|
||||||
var versionNumbers []uint64
|
var versionNumbers []uint64
|
||||||
@ -139,7 +141,7 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p
|
|||||||
// var requestBodyLen int
|
// var requestBodyLen int
|
||||||
|
|
||||||
for _, group := range req.ReqList {
|
for _, group := range req.ReqList {
|
||||||
groupVersionMap[group.GroupID] = &VersionInfo{
|
groupsVersionMap[group.GroupID] = &VersionInfo{
|
||||||
GroupID: group.GroupID,
|
GroupID: group.GroupID,
|
||||||
VersionID: group.VersionID,
|
VersionID: group.VersionID,
|
||||||
VersionNumber: group.Version,
|
VersionNumber: group.Version,
|
||||||
@ -157,19 +159,19 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p
|
|||||||
if group.Status == constant.GroupStatusDismissed {
|
if group.Status == constant.GroupStatusDismissed {
|
||||||
err = servererrs.ErrDismissedAlready.Wrap()
|
err = servererrs.ErrDismissedAlready.Wrap()
|
||||||
log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID)
|
log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID)
|
||||||
delete(groupVersionMap, group.GroupID)
|
|
||||||
|
delete(groupsVersionMap, group.GroupID)
|
||||||
} else {
|
} else {
|
||||||
groupsMap[group.GroupID] = group
|
groupsMap[group.GroupID] = group
|
||||||
// truegroupIDs = append(truegroupIDs, group.GroupID)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for key, val := range groupVersionMap {
|
|
||||||
targetKeys = append(targetKeys, key)
|
|
||||||
versionIDs = append(versionIDs, val.VersionID)
|
|
||||||
versionNumbers = append(versionNumbers, val.VersionNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
var hasGroupUpdate map[string]bool
|
for groupID, vInfo := range groupsVersionMap {
|
||||||
|
targetKeys = append(targetKeys, groupID)
|
||||||
|
versionIDs = append(versionIDs, vInfo.VersionID)
|
||||||
|
versionNumbers = append(versionNumbers, vInfo.VersionNumber)
|
||||||
|
}
|
||||||
|
|
||||||
opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{
|
opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
TargetKeys: targetKeys,
|
TargetKeys: targetKeys,
|
||||||
@ -180,17 +182,18 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for key, vlog := range vLogs {
|
|
||||||
|
for groupID, vlog := range vLogs {
|
||||||
vlog.Logs = slices.DeleteFunc(vlog.Logs, func(elem model.VersionLogElem) bool {
|
vlog.Logs = slices.DeleteFunc(vlog.Logs, func(elem model.VersionLogElem) bool {
|
||||||
if elem.EID == "" {
|
if elem.EID == "" {
|
||||||
vlog.LogLen--
|
vlog.LogLen--
|
||||||
hasGroupUpdate[key] = true
|
hasGroupUpdateMap[groupID] = true
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
if vlog.LogLen > 0 {
|
if vlog.LogLen > 0 {
|
||||||
hasGroupUpdate[key] = true
|
hasGroupUpdateMap[groupID] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -198,30 +201,24 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p
|
|||||||
},
|
},
|
||||||
CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache,
|
CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache,
|
||||||
Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) {
|
Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) {
|
||||||
// memberInfoMap := make(map[string][]*sdkws.GroupMemberFullInfo)
|
|
||||||
// for _, groupID := range groupIDs {
|
|
||||||
memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids)
|
memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// memberInfoMap:=datautil.SliceToMap(memberInfo, func(e *sdkws.GroupMemberFullInfo) string {
|
|
||||||
// return e.GroupID
|
|
||||||
// })
|
|
||||||
// // memberInfoMap[groupID] = memberInfo
|
|
||||||
// // }
|
|
||||||
return memberInfo, err
|
return memberInfo, err
|
||||||
},
|
},
|
||||||
Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp {
|
Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp {
|
||||||
resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp)
|
resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp)
|
||||||
|
|
||||||
for key, version := range versions {
|
for groupID, versionLog := range versions {
|
||||||
resList[key] = &pbgroup.GetIncrementalGroupMemberResp{
|
resList[groupID] = &pbgroup.GetIncrementalGroupMemberResp{
|
||||||
VersionID: version.ID.Hex(),
|
VersionID: versionLog.ID.Hex(),
|
||||||
Version: uint64(version.Version),
|
Version: uint64(versionLog.Version),
|
||||||
Full: fullMap[key],
|
Full: fullMap[groupID],
|
||||||
Delete: deleteIdsMap[key],
|
Delete: deleteIdsMap[groupID],
|
||||||
Insert: insertListMap[key],
|
Insert: insertListMap[groupID],
|
||||||
Update: updateListMap[key],
|
Update: updateListMap[groupID],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,17 +232,20 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for key, val := range resp.RespList {
|
|
||||||
if val.Full || hasGroupUpdate[key] {
|
for groupID, val := range resp.RespList {
|
||||||
count, err := s.db.FindGroupMemberNum(ctx, key)
|
if val.Full || hasGroupUpdateMap[groupID] {
|
||||||
|
count, err := s.db.FindGroupMemberNum(ctx, groupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
owner, err := s.db.TakeGroupOwner(ctx, key)
|
|
||||||
|
owner, err := s.db.TakeGroupOwner(ctx, groupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.RespList[key].Group = s.groupDB2PB(groupsMap[key], owner.UserID, count)
|
|
||||||
|
resp.RespList[groupID].Group = s.groupDB2PB(groupsMap[groupID], owner.UserID, count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -18,8 +18,7 @@ type BatchOption[A, B any] struct {
|
|||||||
Versions func(ctx context.Context, dIds []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error)
|
Versions func(ctx context.Context, dIds []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error)
|
||||||
CacheMaxVersions func(ctx context.Context, dIds []string) (map[string]*model.VersionLog, error)
|
CacheMaxVersions func(ctx context.Context, dIds []string) (map[string]*model.VersionLog, error)
|
||||||
Find func(ctx context.Context, dId string, ids []string) (A, error)
|
Find func(ctx context.Context, dId string, ids []string) (A, error)
|
||||||
// Resp func(version map[string]*model.VersionLog, deleteIds, insertList, updateList []A, full bool) []*B
|
Resp func(versionsMap map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string]A, fullMap map[string]bool) *B
|
||||||
Resp func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string]A, fullMap map[string]bool) *B
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *BatchOption[A, B]) newError(msg string) error {
|
func (o *BatchOption[A, B]) newError(msg string) error {
|
||||||
@ -212,26 +211,3 @@ func (o *BatchOption[A, B]) Build() (*B, error) {
|
|||||||
|
|
||||||
return o.Resp(versions, deleteIdsMap, insertListMap, updateListMap, fullMap), nil
|
return o.Resp(versions, deleteIdsMap, insertListMap, updateListMap, fullMap), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// for _, versionLog := range versionLogs {
|
|
||||||
// if versionLog != nil {
|
|
||||||
// if !full {
|
|
||||||
|
|
||||||
// }
|
|
||||||
// insertIds, deleteIds, updateIds = append(insertIds, versionLog.InsertID...), append(deleteIds, versionLog.DeleteIDs...), append(updateIds, versionLog.UpdateIDs...)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// insertList, err := o.Find(o.Ctx, insertIds)
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
|
|
||||||
// updateList, err := o.Find(o.Ctx, updateIds)
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
|
|
||||||
// full := len(insertIds) > 0 || len(updateIds) > 0
|
|
||||||
|
|
||||||
// return o.Resp(versionLogs, deleteIds, insertList, updateList, full), nil
|
|
||||||
|
|||||||
2
pkg/common/storage/cache/redis/group.go
vendored
2
pkg/common/storage/cache/redis/group.go
vendored
@ -398,8 +398,10 @@ func (g *GroupCacheRedis) BatchFindMaxGroupMemberVersion(ctx context.Context, gr
|
|||||||
}, func(versionLog *model.VersionLog) string {
|
}, func(versionLog *model.VersionLog) string {
|
||||||
return versionLog.DID
|
return versionLog.DID
|
||||||
}, func(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) {
|
}, func(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) {
|
||||||
|
// create two slices with len is groupIDs, just need 0
|
||||||
versions := make([]uint, len(groupIDs))
|
versions := make([]uint, len(groupIDs))
|
||||||
limits := make([]int, len(groupIDs))
|
limits := make([]int, len(groupIDs))
|
||||||
|
|
||||||
return g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits)
|
return g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -504,19 +504,24 @@ func (g *groupDatabase) FindMemberIncrVersion(ctx context.Context, groupID strin
|
|||||||
|
|
||||||
func (g *groupDatabase) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) {
|
func (g *groupDatabase) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) {
|
||||||
if len(groupIDs) == 0 {
|
if len(groupIDs) == 0 {
|
||||||
return nil, nil
|
return nil, errs.New("groupIDs is nil.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// convert []uint64 to []uint
|
||||||
var uintVersions []uint
|
var uintVersions []uint
|
||||||
for _, version := range versions {
|
for _, version := range versions {
|
||||||
uintVersions = append(uintVersions, uint(version))
|
uintVersions = append(uintVersions, uint(version))
|
||||||
}
|
}
|
||||||
|
|
||||||
versionLogs, err := g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, uintVersions, limits)
|
versionLogs, err := g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, uintVersions, limits)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
groupMemberIncrVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string {
|
groupMemberIncrVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string {
|
||||||
return e.DID
|
return e.DID
|
||||||
})
|
})
|
||||||
|
|
||||||
return groupMemberIncrVersionsMap, nil
|
return groupMemberIncrVersionsMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user