mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	* fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * friend incr sync * friend incr sync * friend incr sync * friend incr sync * friend incr sync * mage * optimization version log * optimization version log * sync * sync * sync * group sync * sync option * sync option * refactor: replace `friend` package with `realtion`. * refactor: update lastest commit to relation. * sync option * sync option * sync option * sync * sync * go.mod * seq * update: go mod * refactor: change incremental to full * feat: get full friend user ids * feat: api and config * seq * group version * merge * seq * seq * seq * fix: sort by id avoid unstable sort friends. * group * group * group * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * user version * seq * seq * seq user * user online * implement minio expire delete. * user online * config * fix * fix * implement minio expire delete logic. * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * feat: implement scheduled delete outdated object in minio. * update gomake version * update gomake version * implement FindExpires pagination. * remove unnesseary incr. * fix uncorrect args call. * online push * online push * online push * resolving conflicts * resolving conflicts * test * api prommetrics * api prommetrics * api prommetrics * api prommetrics * api prommetrics * rpc prommetrics * rpc prommetrics * online status * online status * online status * online status * sub * conversation version incremental * merge seq * merge online * merge online * merge online * merge seq * GetOwnerConversation * fix: change incremental syncer router name. * rockscache batch get * rockscache seq batch get * fix: GetMsgDocModelByIndex bug * update go.mod * update go.mod * merge * feat: prometheus * feat: prometheus * group member sort * sub * sub * fix: seq conversion bug * fix: redis pipe exec * sort version * sort version * sort version * remove old version online subscription * remove old version online subscription * version log index --------- Co-authored-by: withchao <withchao@users.noreply.github.com> Co-authored-by: Monet Lee <monet_lee@163.com> Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Co-authored-by: icey-yu <1186114839@qq.com>
		
			
				
	
	
		
			167 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			167 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package msggateway
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"github.com/openimsdk/protocol/sdkws"
 | |
| 	"github.com/openimsdk/tools/log"
 | |
| 	"github.com/openimsdk/tools/utils/datautil"
 | |
| 	"google.golang.org/protobuf/proto"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
 | |
| 	if ws.clients.RecvSubChange(userID, platformIDs) {
 | |
| 		log.ZDebug(ctx, "gateway receive subscription message and go back online", "userID", userID, "platformIDs", platformIDs)
 | |
| 	} else {
 | |
| 		log.ZDebug(ctx, "gateway ignore user online status changes", "userID", userID, "platformIDs", platformIDs)
 | |
| 	}
 | |
| 	ws.pushUserIDOnlineStatus(ctx, userID, platformIDs)
 | |
| }
 | |
| 
 | |
| func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) {
 | |
| 	var sub sdkws.SubUserOnlineStatus
 | |
| 	if err := proto.Unmarshal(data.Data, &sub); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	ws.subscription.Sub(client, sub.SubscribeUserID, sub.UnsubscribeUserID)
 | |
| 	var resp sdkws.SubUserOnlineStatusTips
 | |
| 	if len(sub.SubscribeUserID) > 0 {
 | |
| 		resp.Subscribers = make([]*sdkws.SubUserOnlineStatusElem, 0, len(sub.SubscribeUserID))
 | |
| 		for _, userID := range sub.SubscribeUserID {
 | |
| 			platformIDs, err := ws.online.GetUserOnlinePlatform(ctx, userID)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			resp.Subscribers = append(resp.Subscribers, &sdkws.SubUserOnlineStatusElem{
 | |
| 				UserID:            userID,
 | |
| 				OnlinePlatformIDs: platformIDs,
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 	return proto.Marshal(&resp)
 | |
| }
 | |
| 
 | |
| func newSubscription() *Subscription {
 | |
| 	return &Subscription{
 | |
| 		userIDs: make(map[string]*subClient),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type subClient struct {
 | |
| 	clients map[string]*Client
 | |
| }
 | |
| 
 | |
| type Subscription struct {
 | |
| 	lock    sync.RWMutex
 | |
| 	userIDs map[string]*subClient // subscribe to the user's client connection
 | |
| }
 | |
| 
 | |
| func (s *Subscription) DelClient(client *Client) {
 | |
| 	client.subLock.Lock()
 | |
| 	userIDs := datautil.Keys(client.subUserIDs)
 | |
| 	for _, userID := range userIDs {
 | |
| 		delete(client.subUserIDs, userID)
 | |
| 	}
 | |
| 	client.subLock.Unlock()
 | |
| 	if len(userIDs) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	addr := client.ctx.GetRemoteAddr()
 | |
| 	s.lock.Lock()
 | |
| 	defer s.lock.Unlock()
 | |
| 	for _, userID := range userIDs {
 | |
| 		sub, ok := s.userIDs[userID]
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		delete(sub.clients, addr)
 | |
| 		if len(sub.clients) == 0 {
 | |
| 			delete(s.userIDs, userID)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Subscription) GetClient(userID string) []*Client {
 | |
| 	s.lock.RLock()
 | |
| 	defer s.lock.RUnlock()
 | |
| 	cs, ok := s.userIDs[userID]
 | |
| 	if !ok {
 | |
| 		return nil
 | |
| 	}
 | |
| 	clients := make([]*Client, 0, len(cs.clients))
 | |
| 	for _, client := range cs.clients {
 | |
| 		clients = append(clients, client)
 | |
| 	}
 | |
| 	return clients
 | |
| }
 | |
| 
 | |
| func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | |
| 	if len(addUserIDs)+len(delUserIDs) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	var (
 | |
| 		del = make(map[string]struct{})
 | |
| 		add = make(map[string]struct{})
 | |
| 	)
 | |
| 	client.subLock.Lock()
 | |
| 	for _, userID := range delUserIDs {
 | |
| 		if _, ok := client.subUserIDs[userID]; !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		del[userID] = struct{}{}
 | |
| 		delete(client.subUserIDs, userID)
 | |
| 	}
 | |
| 	for _, userID := range addUserIDs {
 | |
| 		delete(del, userID)
 | |
| 		if _, ok := client.subUserIDs[userID]; ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		client.subUserIDs[userID] = struct{}{}
 | |
| 		add[userID] = struct{}{}
 | |
| 	}
 | |
| 	client.subLock.Unlock()
 | |
| 	if len(del)+len(add) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	addr := client.ctx.GetRemoteAddr()
 | |
| 	s.lock.Lock()
 | |
| 	defer s.lock.Unlock()
 | |
| 	for userID := range del {
 | |
| 		sub, ok := s.userIDs[userID]
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		delete(sub.clients, addr)
 | |
| 		if len(sub.clients) == 0 {
 | |
| 			delete(s.userIDs, userID)
 | |
| 		}
 | |
| 	}
 | |
| 	for userID := range add {
 | |
| 		sub, ok := s.userIDs[userID]
 | |
| 		if !ok {
 | |
| 			sub = &subClient{clients: make(map[string]*Client)}
 | |
| 			s.userIDs[userID] = sub
 | |
| 		}
 | |
| 		sub.clients[addr] = client
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) {
 | |
| 	clients := ws.subscription.GetClient(userID)
 | |
| 	if len(clients) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	onlineStatus, err := proto.Marshal(&sdkws.SubUserOnlineStatusTips{
 | |
| 		Subscribers: []*sdkws.SubUserOnlineStatusElem{{UserID: userID, OnlinePlatformIDs: platformIDs}},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
 | |
| 		return
 | |
| 	}
 | |
| 	for _, client := range clients {
 | |
| 		if err := client.PushUserOnlineStatus(onlineStatus); err != nil {
 | |
| 			log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "changePlatformID", platformIDs)
 | |
| 		}
 | |
| 	}
 | |
| }
 |