mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 03:13:15 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			224 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package msggateway
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"github.com/openimsdk/protocol/constant"
 | 
						|
	"github.com/openimsdk/tools/log"
 | 
						|
	"github.com/openimsdk/tools/mcontext"
 | 
						|
	"github.com/openimsdk/tools/utils/datautil"
 | 
						|
	"strconv"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
type UserMap interface {
 | 
						|
	GetAll(userID string) ([]*Client, bool)
 | 
						|
	Get(userID string, platformID int) ([]*Client, bool, bool)
 | 
						|
	Set(userID string, v *Client)
 | 
						|
	DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
 | 
						|
	UserState() <-chan UserState
 | 
						|
	GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
 | 
						|
	RecvSubChange(userID string, platformIDs []int32) bool
 | 
						|
}
 | 
						|
 | 
						|
type UserState struct {
 | 
						|
	UserID  string
 | 
						|
	Online  []int32
 | 
						|
	Offline []int32
 | 
						|
}
 | 
						|
 | 
						|
type UserPlatform struct {
 | 
						|
	Time    time.Time
 | 
						|
	Clients []*Client
 | 
						|
}
 | 
						|
 | 
						|
func (u *UserPlatform) String() string {
 | 
						|
	buf := bytes.NewBuffer(nil)
 | 
						|
	buf.WriteString("UserPlatform{Time: ")
 | 
						|
	buf.WriteString(u.Time.String())
 | 
						|
	buf.WriteString(", Clients<")
 | 
						|
	buf.WriteString(strconv.Itoa(len(u.Clients)))
 | 
						|
	buf.WriteString(">: [")
 | 
						|
	for i, client := range u.Clients {
 | 
						|
		if i > 0 {
 | 
						|
			buf.WriteString(", ")
 | 
						|
		}
 | 
						|
		buf.WriteString(fmt.Sprintf("%p %d %s", u.Clients[i], client.PlatformID, constant.PlatformIDToName(client.PlatformID)))
 | 
						|
	}
 | 
						|
	buf.WriteString("]}")
 | 
						|
	return buf.String()
 | 
						|
}
 | 
						|
 | 
						|
func (u *UserPlatform) PlatformIDs() []int32 {
 | 
						|
	if len(u.Clients) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	platformIDs := make([]int32, 0, len(u.Clients))
 | 
						|
	for _, client := range u.Clients {
 | 
						|
		platformIDs = append(platformIDs, int32(client.PlatformID))
 | 
						|
	}
 | 
						|
	return platformIDs
 | 
						|
}
 | 
						|
 | 
						|
func (u *UserPlatform) PlatformIDSet() map[int32]struct{} {
 | 
						|
	if len(u.Clients) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	platformIDs := make(map[int32]struct{})
 | 
						|
	for _, client := range u.Clients {
 | 
						|
		platformIDs[int32(client.PlatformID)] = struct{}{}
 | 
						|
	}
 | 
						|
	return platformIDs
 | 
						|
}
 | 
						|
 | 
						|
func newUserMap() UserMap {
 | 
						|
	return &userMap{
 | 
						|
		data: make(map[string]*UserPlatform),
 | 
						|
		ch:   make(chan UserState, 10000),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type userMap struct {
 | 
						|
	lock sync.RWMutex
 | 
						|
	data map[string]*UserPlatform
 | 
						|
	ch   chan UserState
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) RecvSubChange(userID string, platformIDs []int32) bool {
 | 
						|
	u.lock.RLock()
 | 
						|
	defer u.lock.RUnlock()
 | 
						|
	result, ok := u.data[userID]
 | 
						|
	if !ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	localPlatformIDs := result.PlatformIDSet()
 | 
						|
	for _, platformID := range platformIDs {
 | 
						|
		delete(localPlatformIDs, platformID)
 | 
						|
	}
 | 
						|
	if len(localPlatformIDs) == 0 {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	u.push(userID, result, nil)
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
 | 
						|
	select {
 | 
						|
	case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
 | 
						|
		userPlatform.Time = time.Now()
 | 
						|
		return true
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) GetAll(userID string) ([]*Client, bool) {
 | 
						|
	u.lock.RLock()
 | 
						|
	defer u.lock.RUnlock()
 | 
						|
	result, ok := u.data[userID]
 | 
						|
	if !ok {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
	return result.Clients, true
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) {
 | 
						|
	u.lock.RLock()
 | 
						|
	defer u.lock.RUnlock()
 | 
						|
	result, ok := u.data[userID]
 | 
						|
	if !ok {
 | 
						|
		return nil, false, false
 | 
						|
	}
 | 
						|
	var clients []*Client
 | 
						|
	for _, client := range result.Clients {
 | 
						|
		if client.PlatformID == platformID {
 | 
						|
			clients = append(clients, client)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return clients, true, len(clients) > 0
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) Set(userID string, client *Client) {
 | 
						|
	log.ZDebug(context.Background(), "userMap Set", "userID", userID, "platformID", client.PlatformID, "platform", constant.PlatformIDToName(client.PlatformID), "pointer", fmt.Sprintf("%p", client))
 | 
						|
	u.lock.Lock()
 | 
						|
	defer u.lock.Unlock()
 | 
						|
	result, ok := u.data[userID]
 | 
						|
	if ok {
 | 
						|
		result.Clients = append(result.Clients, client)
 | 
						|
	} else {
 | 
						|
		result = &UserPlatform{
 | 
						|
			Clients: []*Client{client},
 | 
						|
		}
 | 
						|
		u.data[userID] = result
 | 
						|
	}
 | 
						|
	u.push(client.UserID, result, nil)
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
 | 
						|
	if len(clients) == 0 {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	for i, client := range clients {
 | 
						|
		log.ZDebug(context.Background(), "userMap DeleteClients", "userID", userID, "platformID", client.PlatformID, "platform", constant.PlatformIDToName(client.PlatformID), "count", fmt.Sprintf("%p %d/%d", clients[i], i+1, len(clients)))
 | 
						|
	}
 | 
						|
	u.lock.Lock()
 | 
						|
	defer u.lock.Unlock()
 | 
						|
	result, ok := u.data[userID]
 | 
						|
	if !ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	offline := make([]int32, 0, len(clients))
 | 
						|
	deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string {
 | 
						|
		return client.ctx.GetRemoteAddr()
 | 
						|
	})
 | 
						|
	tmp := result.Clients
 | 
						|
	result.Clients = result.Clients[:0]
 | 
						|
	for _, client := range tmp {
 | 
						|
		if _, delCli := deleteAddr[client.ctx.GetRemoteAddr()]; delCli {
 | 
						|
			offline = append(offline, int32(client.PlatformID))
 | 
						|
		} else {
 | 
						|
			result.Clients = append(result.Clients, client)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	defer u.push(userID, result, offline)
 | 
						|
	if len(result.Clients) > 0 {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	delete(u.data, userID)
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
var opIDIncr atomic.Int64
 | 
						|
 | 
						|
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) (result []UserState) {
 | 
						|
	ctx := mcontext.SetOperationID(context.Background(), fmt.Sprintf("op_%d", opIDIncr.Add(1)))
 | 
						|
	log.ZDebug(ctx, "userMap GetAllUserStatus", "deadline", deadline, "nowtime", nowtime)
 | 
						|
	defer func() {
 | 
						|
		log.ZDebug(ctx, "userMap GetAllUserStatus", "num", len(result), "result", result)
 | 
						|
	}()
 | 
						|
	u.lock.RLock()
 | 
						|
	defer u.lock.RUnlock()
 | 
						|
	result = make([]UserState, 0, len(u.data))
 | 
						|
	for userID, userPlatform := range u.data {
 | 
						|
		skip := deadline.Before(userPlatform.Time)
 | 
						|
		log.ZDebug(ctx, "userMap GetAllUserStatus", "userID", userID, "skip", skip, "platforms", userPlatform.String())
 | 
						|
		if skip {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		userPlatform.Time = nowtime
 | 
						|
		online := make([]int32, 0, len(userPlatform.Clients))
 | 
						|
		for _, client := range userPlatform.Clients {
 | 
						|
			online = append(online, int32(client.PlatformID))
 | 
						|
		}
 | 
						|
		result = append(result, UserState{UserID: userID, Online: online})
 | 
						|
	}
 | 
						|
	return result
 | 
						|
}
 | 
						|
 | 
						|
func (u *userMap) UserState() <-chan UserState {
 | 
						|
	return u.ch
 | 
						|
}
 |