2024-08-01 11:25:19 +08:00

224 lines
5.8 KiB
Go

package msggateway
import (
"bytes"
"context"
"fmt"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"strconv"
"sync"
"sync/atomic"
"time"
)
type UserMap interface {
GetAll(userID string) ([]*Client, bool)
Get(userID string, platformID int) ([]*Client, bool, bool)
Set(userID string, v *Client)
DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
UserState() <-chan UserState
GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
RecvSubChange(userID string, platformIDs []int32) bool
}
type UserState struct {
UserID string
Online []int32
Offline []int32
}
type UserPlatform struct {
Time time.Time
Clients []*Client
}
func (u *UserPlatform) String() string {
buf := bytes.NewBuffer(nil)
buf.WriteString("UserPlatform{Time: ")
buf.WriteString(u.Time.String())
buf.WriteString(", Clients<")
buf.WriteString(strconv.Itoa(len(u.Clients)))
buf.WriteString(">: [")
for i, client := range u.Clients {
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString(fmt.Sprintf("%p %d %s", u.Clients[i], client.PlatformID, constant.PlatformIDToName(client.PlatformID)))
}
buf.WriteString("]}")
return buf.String()
}
func (u *UserPlatform) PlatformIDs() []int32 {
if len(u.Clients) == 0 {
return nil
}
platformIDs := make([]int32, 0, len(u.Clients))
for _, client := range u.Clients {
platformIDs = append(platformIDs, int32(client.PlatformID))
}
return platformIDs
}
func (u *UserPlatform) PlatformIDSet() map[int32]struct{} {
if len(u.Clients) == 0 {
return nil
}
platformIDs := make(map[int32]struct{})
for _, client := range u.Clients {
platformIDs[int32(client.PlatformID)] = struct{}{}
}
return platformIDs
}
func newUserMap() UserMap {
return &userMap{
data: make(map[string]*UserPlatform),
ch: make(chan UserState, 10000),
}
}
type userMap struct {
lock sync.RWMutex
data map[string]*UserPlatform
ch chan UserState
}
func (u *userMap) RecvSubChange(userID string, platformIDs []int32) bool {
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return false
}
localPlatformIDs := result.PlatformIDSet()
for _, platformID := range platformIDs {
delete(localPlatformIDs, platformID)
}
if len(localPlatformIDs) == 0 {
return false
}
u.push(userID, result, nil)
return true
}
func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
select {
case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
userPlatform.Time = time.Now()
return true
default:
return false
}
}
func (u *userMap) GetAll(userID string) ([]*Client, bool) {
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return nil, false
}
return result.Clients, true
}
func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) {
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return nil, false, false
}
var clients []*Client
for _, client := range result.Clients {
if client.PlatformID == platformID {
clients = append(clients, client)
}
}
return clients, true, len(clients) > 0
}
func (u *userMap) Set(userID string, client *Client) {
log.ZDebug(context.Background(), "userMap Set", "userID", userID, "platformID", client.PlatformID, "platform", constant.PlatformIDToName(client.PlatformID), "pointer", fmt.Sprintf("%p", client))
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if ok {
result.Clients = append(result.Clients, client)
} else {
result = &UserPlatform{
Clients: []*Client{client},
}
u.data[userID] = result
}
u.push(client.UserID, result, nil)
}
func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
if len(clients) == 0 {
return false
}
for i, client := range clients {
log.ZDebug(context.Background(), "userMap DeleteClients", "userID", userID, "platformID", client.PlatformID, "platform", constant.PlatformIDToName(client.PlatformID), "count", fmt.Sprintf("%p %d/%d", clients[i], i+1, len(clients)))
}
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if !ok {
return false
}
offline := make([]int32, 0, len(clients))
deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string {
return client.ctx.GetRemoteAddr()
})
tmp := result.Clients
result.Clients = result.Clients[:0]
for _, client := range tmp {
if _, delCli := deleteAddr[client.ctx.GetRemoteAddr()]; delCli {
offline = append(offline, int32(client.PlatformID))
} else {
result.Clients = append(result.Clients, client)
}
}
defer u.push(userID, result, offline)
if len(result.Clients) > 0 {
return false
}
delete(u.data, userID)
return true
}
var opIDIncr atomic.Int64
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) (result []UserState) {
ctx := mcontext.SetOperationID(context.Background(), fmt.Sprintf("op_%d", opIDIncr.Add(1)))
log.ZDebug(ctx, "userMap GetAllUserStatus", "deadline", deadline, "nowtime", nowtime)
defer func() {
log.ZDebug(ctx, "userMap GetAllUserStatus", "num", len(result), "result", result)
}()
u.lock.RLock()
defer u.lock.RUnlock()
result = make([]UserState, 0, len(u.data))
for userID, userPlatform := range u.data {
add := userPlatform.Time.Before(deadline)
log.ZDebug(ctx, "userMap GetAllUserStatus", "userID", userID, "add", add, "platforms", userPlatform.String())
if add {
continue
}
userPlatform.Time = nowtime
online := make([]int32, 0, len(userPlatform.Clients))
for _, client := range userPlatform.Clients {
online = append(online, int32(client.PlatformID))
}
result = append(result, UserState{UserID: userID, Online: online})
}
return result
}
func (u *userMap) UserState() <-chan UserState {
return u.ch
}