mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	Merge branch 'openimsdk:main' into main
This commit is contained in:
		
						commit
						b61dd3631e
					
				| @ -11,10 +11,10 @@ import ( | |||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	pbgroup "github.com/openimsdk/protocol/group" | 	pbgroup "github.com/openimsdk/protocol/group" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"github.com/openimsdk/tools/errs" |  | ||||||
| 	"github.com/openimsdk/tools/log" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | const versionSyncLimit = 500 | ||||||
|  | 
 | ||||||
| func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { | func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { | ||||||
| 	vl, err := g.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) | 	vl, err := g.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -132,150 +132,6 @@ func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou | |||||||
| 	return resp, nil | 	return resp, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (resp *pbgroup.BatchGetIncrementalGroupMemberResp, err error) { |  | ||||||
| 	type VersionInfo struct { |  | ||||||
| 		GroupID       string |  | ||||||
| 		VersionID     string |  | ||||||
| 		VersionNumber uint64 |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	var groupIDs []string |  | ||||||
| 
 |  | ||||||
| 	groupsVersionMap := make(map[string]*VersionInfo) |  | ||||||
| 	groupsMap := make(map[string]*model.Group) |  | ||||||
| 	hasGroupUpdateMap := make(map[string]bool) |  | ||||||
| 	sortVersionMap := make(map[string]uint64) |  | ||||||
| 
 |  | ||||||
| 	var targetKeys, versionIDs []string |  | ||||||
| 	var versionNumbers []uint64 |  | ||||||
| 
 |  | ||||||
| 	var requestBodyLen int |  | ||||||
| 
 |  | ||||||
| 	for _, group := range req.ReqList { |  | ||||||
| 		groupsVersionMap[group.GroupID] = &VersionInfo{ |  | ||||||
| 			GroupID:       group.GroupID, |  | ||||||
| 			VersionID:     group.VersionID, |  | ||||||
| 			VersionNumber: group.Version, |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		groupIDs = append(groupIDs, group.GroupID) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	groups, err := g.db.FindGroup(ctx, groupIDs) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, errs.Wrap(err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for _, group := range groups { |  | ||||||
| 		if group.Status == constant.GroupStatusDismissed { |  | ||||||
| 			err = servererrs.ErrDismissedAlready.Wrap() |  | ||||||
| 			log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID) |  | ||||||
| 
 |  | ||||||
| 			delete(groupsVersionMap, group.GroupID) |  | ||||||
| 		} else { |  | ||||||
| 			groupsMap[group.GroupID] = group |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for groupID, vInfo := range groupsVersionMap { |  | ||||||
| 		targetKeys = append(targetKeys, groupID) |  | ||||||
| 		versionIDs = append(versionIDs, vInfo.VersionID) |  | ||||||
| 		versionNumbers = append(versionNumbers, vInfo.VersionNumber) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{ |  | ||||||
| 		Ctx:            ctx, |  | ||||||
| 		TargetKeys:     targetKeys, |  | ||||||
| 		VersionIDs:     versionIDs, |  | ||||||
| 		VersionNumbers: versionNumbers, |  | ||||||
| 		Versions: func(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) { |  | ||||||
| 			vLogs, err := g.db.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, errs.Wrap(err) |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			for groupID, vlog := range vLogs { |  | ||||||
| 				vlogElems := make([]model.VersionLogElem, 0, len(vlog.Logs)) |  | ||||||
| 				for i, log := range vlog.Logs { |  | ||||||
| 					switch log.EID { |  | ||||||
| 					case model.VersionGroupChangeID: |  | ||||||
| 						vlog.LogLen-- |  | ||||||
| 						hasGroupUpdateMap[groupID] = true |  | ||||||
| 					case model.VersionSortChangeID: |  | ||||||
| 						vlog.LogLen-- |  | ||||||
| 						sortVersionMap[groupID] = uint64(log.Version) |  | ||||||
| 					default: |  | ||||||
| 						vlogElems = append(vlogElems, vlog.Logs[i]) |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 				vlog.Logs = vlogElems |  | ||||||
| 				if vlog.LogLen > 0 { |  | ||||||
| 					hasGroupUpdateMap[groupID] = true |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			return vLogs, nil |  | ||||||
| 		}, |  | ||||||
| 		CacheMaxVersions: g.db.BatchFindMaxGroupMemberVersionCache, |  | ||||||
| 		Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { |  | ||||||
| 			memberInfo, err := g.getGroupMembersInfo(ctx, groupID, ids) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			return memberInfo, err |  | ||||||
| 		}, |  | ||||||
| 		Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp { |  | ||||||
| 			resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) |  | ||||||
| 
 |  | ||||||
| 			for groupID, versionLog := range versions { |  | ||||||
| 				resList[groupID] = &pbgroup.GetIncrementalGroupMemberResp{ |  | ||||||
| 					VersionID:   versionLog.ID.Hex(), |  | ||||||
| 					Version:     uint64(versionLog.Version), |  | ||||||
| 					Full:        fullMap[groupID], |  | ||||||
| 					Delete:      deleteIdsMap[groupID], |  | ||||||
| 					Insert:      insertListMap[groupID], |  | ||||||
| 					Update:      updateListMap[groupID], |  | ||||||
| 					SortVersion: sortVersionMap[groupID], |  | ||||||
| 				} |  | ||||||
| 
 |  | ||||||
| 				requestBodyLen += len(insertListMap[groupID]) + len(updateListMap[groupID]) + len(deleteIdsMap[groupID]) |  | ||||||
| 				if requestBodyLen > 200 { |  | ||||||
| 					break |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			return &pbgroup.BatchGetIncrementalGroupMemberResp{ |  | ||||||
| 				RespList: resList, |  | ||||||
| 			} |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	resp, err = opt.Build() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, errs.Wrap(err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for groupID, val := range resp.RespList { |  | ||||||
| 		if val.Full || hasGroupUpdateMap[groupID] { |  | ||||||
| 			count, err := g.db.FindGroupMemberNum(ctx, groupID) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			owner, err := g.db.TakeGroupOwner(ctx, groupID) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			resp.RespList[groupID].Group = g.groupDB2PB(groupsMap[groupID], owner.UserID, count) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return resp, nil |  | ||||||
| 
 |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { | func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { | ||||||
| 	if err := authverify.CheckAccessV3(ctx, req.UserID, g.config.Share.IMAdminUserID); err != nil { | 	if err := authverify.CheckAccessV3(ctx, req.UserID, g.config.Share.IMAdminUserID); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -301,3 +157,23 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. | |||||||
| 	} | 	} | ||||||
| 	return opt.Build() | 	return opt.Build() | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { | ||||||
|  | 	var num int | ||||||
|  | 	resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) | ||||||
|  | 	for _, memberReq := range req.ReqList { | ||||||
|  | 		if _, ok := resp[memberReq.GroupID]; ok { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		resp[memberReq.GroupID] = memberResp | ||||||
|  | 		num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete) | ||||||
|  | 		if num >= versionSyncLimit { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user