mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	sub
This commit is contained in:
		
							parent
							
								
									67d17c82d5
								
							
						
					
					
						commit
						9a4f5f78cb
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@ -175,3 +175,5 @@ require (
 | 
			
		||||
	golang.org/x/crypto v0.21.0 // indirect
 | 
			
		||||
	gopkg.in/ini.v1 v1.67.0 // indirect
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							@ -262,8 +262,6 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 | 
			
		||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
			
		||||
 | 
			
		||||
@ -72,8 +72,8 @@ type Client struct {
 | 
			
		||||
	closed         atomic.Bool
 | 
			
		||||
	closedErr      error
 | 
			
		||||
	token          string
 | 
			
		||||
	//subLock        sync.Mutex
 | 
			
		||||
	//subUserIDs     map[string]struct{}
 | 
			
		||||
	subLock        sync.Mutex
 | 
			
		||||
	subUserIDs     map[string]struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ResetClient updates the client's state with new connection and context information.
 | 
			
		||||
@ -204,6 +204,8 @@ func (c *Client) handleMessage(message []byte) error {
 | 
			
		||||
		resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq)
 | 
			
		||||
	case WsSetBackgroundStatus:
 | 
			
		||||
		resp, messageErr = c.setAppBackgroundStatus(ctx, binaryReq)
 | 
			
		||||
	case WsSubUserOnlineStatus:
 | 
			
		||||
		resp, messageErr = c.longConnServer.SubUserOnlineStatus(ctx, c, binaryReq)
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf(
 | 
			
		||||
			"ReqIdentifier failed,sendID:%s,msgIncr:%s,reqIdentifier:%d",
 | 
			
		||||
 | 
			
		||||
@ -16,10 +16,10 @@ package msggateway
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"crypto/rand"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	"unsafe"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func mockRandom() []byte {
 | 
			
		||||
@ -132,3 +132,8 @@ func BenchmarkDecompressWithSyncPool(b *testing.B) {
 | 
			
		||||
		assert.Equal(b, nil, err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestName(t *testing.T) {
 | 
			
		||||
	t.Log(unsafe.Sizeof(Client{}))
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -43,6 +43,7 @@ const (
 | 
			
		||||
	WSKickOnlineMsg       = 2002
 | 
			
		||||
	WsLogoutMsg           = 2003
 | 
			
		||||
	WsSetBackgroundStatus = 2004
 | 
			
		||||
	WsSubUserOnlineStatus = 2005
 | 
			
		||||
	WSDataError           = 3001
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import (
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 | 
			
		||||
	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
	"github.com/openimsdk/protocol/msggateway"
 | 
			
		||||
	"github.com/openimsdk/tools/discovery"
 | 
			
		||||
@ -31,6 +32,10 @@ import (
 | 
			
		||||
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
	s.LongConnServer.SetDiscoveryRegistry(disCov, config)
 | 
			
		||||
	msggateway.RegisterMsgGatewayServer(server, s)
 | 
			
		||||
	s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
 | 
			
		||||
	if s.ready != nil {
 | 
			
		||||
		return s.ready(s)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -50,18 +55,21 @@ type Server struct {
 | 
			
		||||
	LongConnServer LongConnServer
 | 
			
		||||
	config         *Config
 | 
			
		||||
	pushTerminal   map[int]struct{}
 | 
			
		||||
	ready          func(srv *Server) error
 | 
			
		||||
	userRcp        rpcclient.UserRpcClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
 | 
			
		||||
	s.LongConnServer = LongConnServer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config) *Server {
 | 
			
		||||
func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server {
 | 
			
		||||
	s := &Server{
 | 
			
		||||
		rpcPort:        rpcPort,
 | 
			
		||||
		LongConnServer: longConnServer,
 | 
			
		||||
		pushTerminal:   make(map[int]struct{}),
 | 
			
		||||
		config:         conf,
 | 
			
		||||
		ready:          ready,
 | 
			
		||||
	}
 | 
			
		||||
	s.pushTerminal[constant.IOSPlatformID] = struct{}{}
 | 
			
		||||
	s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ package msggateway
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
 | 
			
		||||
	"github.com/openimsdk/tools/db/redisutil"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
	"time"
 | 
			
		||||
@ -56,11 +57,13 @@ func Start(ctx context.Context, index int, conf *Config) error {
 | 
			
		||||
		WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error {
 | 
			
		||||
		longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges)
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	go longServer.ChangeOnlineStatus(4)
 | 
			
		||||
 | 
			
		||||
	go longServer.SubscriberUserOnlineStatusChanges(rdb)
 | 
			
		||||
 | 
			
		||||
	hubServer := NewServer(rpcPort, longServer, conf)
 | 
			
		||||
	netDone := make(chan error)
 | 
			
		||||
	go func() {
 | 
			
		||||
		err = hubServer.Start(ctx, index, conf)
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
 | 
			
		||||
	pbAuth "github.com/openimsdk/protocol/auth"
 | 
			
		||||
	"github.com/openimsdk/tools/mcontext"
 | 
			
		||||
	"net/http"
 | 
			
		||||
@ -48,20 +49,22 @@ type LongConnServer interface {
 | 
			
		||||
	KickUserConn(client *Client) error
 | 
			
		||||
	UnRegister(c *Client)
 | 
			
		||||
	SetKickHandlerInfo(i *kickHandler)
 | 
			
		||||
	SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)
 | 
			
		||||
	Compressor
 | 
			
		||||
	Encoder
 | 
			
		||||
	MessageHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type WsServer struct {
 | 
			
		||||
	msgGatewayConfig *Config
 | 
			
		||||
	port             int
 | 
			
		||||
	wsMaxConnNum     int64
 | 
			
		||||
	registerChan     chan *Client
 | 
			
		||||
	unregisterChan   chan *Client
 | 
			
		||||
	kickHandlerChan  chan *kickHandler
 | 
			
		||||
	clients          UserMap
 | 
			
		||||
	//subscription      *Subscription
 | 
			
		||||
	msgGatewayConfig  *Config
 | 
			
		||||
	port              int
 | 
			
		||||
	wsMaxConnNum      int64
 | 
			
		||||
	registerChan      chan *Client
 | 
			
		||||
	unregisterChan    chan *Client
 | 
			
		||||
	kickHandlerChan   chan *kickHandler
 | 
			
		||||
	clients           UserMap
 | 
			
		||||
	online            *rpccache.OnlineCache
 | 
			
		||||
	subscription      *Subscription
 | 
			
		||||
	clientPool        sync.Pool
 | 
			
		||||
	onlineUserNum     atomic.Int64
 | 
			
		||||
	onlineUserConnNum atomic.Int64
 | 
			
		||||
@ -125,6 +128,8 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
 | 
			
		||||
	for _, o := range opts {
 | 
			
		||||
		o(&config)
 | 
			
		||||
	}
 | 
			
		||||
	//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
 | 
			
		||||
 | 
			
		||||
	v := validator.New()
 | 
			
		||||
	return &WsServer{
 | 
			
		||||
		msgGatewayConfig: msgGatewayConfig,
 | 
			
		||||
@ -142,10 +147,10 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
 | 
			
		||||
		kickHandlerChan: make(chan *kickHandler, 1000),
 | 
			
		||||
		validate:        v,
 | 
			
		||||
		clients:         newUserMap(),
 | 
			
		||||
		//subscription:    newSubscription(),
 | 
			
		||||
		Compressor:    NewGzipCompressor(),
 | 
			
		||||
		Encoder:       NewGobEncoder(),
 | 
			
		||||
		webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
 | 
			
		||||
		subscription:    newSubscription(),
 | 
			
		||||
		Compressor:      NewGzipCompressor(),
 | 
			
		||||
		Encoder:         NewGobEncoder(),
 | 
			
		||||
		webhookClient:   webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -353,6 +358,9 @@ func (ws *WsServer) unregisterClient(client *Client) {
 | 
			
		||||
		prommetrics.OnlineUserGauge.Dec()
 | 
			
		||||
	}
 | 
			
		||||
	ws.onlineUserConnNum.Add(-1)
 | 
			
		||||
	client.subLock.Lock()
 | 
			
		||||
	clear(client.subUserIDs)
 | 
			
		||||
	client.subLock.Unlock()
 | 
			
		||||
	//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
 | 
			
		||||
	log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
 | 
			
		||||
		ws.onlineUserNum.Load(), "online user conn Num",
 | 
			
		||||
 | 
			
		||||
@ -2,174 +2,180 @@ package msggateway
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/util/useronline"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
	"github.com/openimsdk/tools/mcontext"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/idutil"
 | 
			
		||||
	"google.golang.org/protobuf/proto"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (ws *WsServer) SubscriberUserOnlineStatusChanges(rdb redis.UniversalClient) {
 | 
			
		||||
	ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10))
 | 
			
		||||
	for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
 | 
			
		||||
		userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
 | 
			
		||||
func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
 | 
			
		||||
	if ws.clients.RecvSubChange(userID, platformIDs) {
 | 
			
		||||
		log.ZDebug(ctx, "gateway receive subscription message and go back online", "userID", userID, "platformIDs", platformIDs)
 | 
			
		||||
	} else {
 | 
			
		||||
		log.ZDebug(ctx, "gateway ignore user online status changes", "userID", userID, "platformIDs", platformIDs)
 | 
			
		||||
	}
 | 
			
		||||
	ws.pushUserIDOnlineStatus(ctx, userID, platformIDs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) {
 | 
			
		||||
	var sub sdkws.SubUserOnlineStatus
 | 
			
		||||
	if err := proto.Unmarshal(data.Data, &sub); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	ws.subscription.Sub(client, sub.SubscribeUserID, sub.UnsubscribeUserID)
 | 
			
		||||
	var resp sdkws.SubUserOnlineStatusTips
 | 
			
		||||
	if len(sub.SubscribeUserID) > 0 {
 | 
			
		||||
		resp.Subscribers = make([]*sdkws.SubUserOnlineStatusElem, 0, len(sub.SubscribeUserID))
 | 
			
		||||
		for _, userID := range sub.SubscribeUserID {
 | 
			
		||||
			platformIDs, err := ws.online.GetUserOnlinePlatform(ctx, userID)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			resp.Subscribers = append(resp.Subscribers, &sdkws.SubUserOnlineStatusElem{
 | 
			
		||||
				UserID:            userID,
 | 
			
		||||
				OnlinePlatformIDs: platformIDs,
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return proto.Marshal(&resp)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type subClient struct {
 | 
			
		||||
	clients map[string]*Client
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newSubscription() *Subscription {
 | 
			
		||||
	return &Subscription{
 | 
			
		||||
		userIDs: make(map[string]*subClient),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Subscription struct {
 | 
			
		||||
	lock    sync.RWMutex
 | 
			
		||||
	userIDs map[string]*subClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Subscription) GetClient(userID string) []*Client {
 | 
			
		||||
	s.lock.RLock()
 | 
			
		||||
	defer s.lock.RUnlock()
 | 
			
		||||
	cs, ok := s.userIDs[userID]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	clients := make([]*Client, 0, len(cs.clients))
 | 
			
		||||
	for _, client := range cs.clients {
 | 
			
		||||
		clients = append(clients, client)
 | 
			
		||||
	}
 | 
			
		||||
	return clients
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Subscription) DelClient(client *Client) {
 | 
			
		||||
	client.subLock.Lock()
 | 
			
		||||
	userIDs := datautil.Keys(client.subUserIDs)
 | 
			
		||||
	for _, userID := range userIDs {
 | 
			
		||||
		delete(client.subUserIDs, userID)
 | 
			
		||||
	}
 | 
			
		||||
	client.subLock.Unlock()
 | 
			
		||||
	if len(userIDs) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	addr := client.ctx.GetRemoteAddr()
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	for _, userID := range userIDs {
 | 
			
		||||
		sub, ok := s.userIDs[userID]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if ws.clients.RecvSubChange(userID, platformIDs) {
 | 
			
		||||
			log.ZDebug(ctx, "gateway receive subscription message and go back online", "userID", userID, "platformIDs", platformIDs)
 | 
			
		||||
		} else {
 | 
			
		||||
			log.ZDebug(ctx, "gateway ignore user online status changes", "userID", userID, "platformIDs", platformIDs)
 | 
			
		||||
		delete(sub.clients, addr)
 | 
			
		||||
		if len(sub.clients) == 0 {
 | 
			
		||||
			delete(s.userIDs, userID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//import (
 | 
			
		||||
//	"context"
 | 
			
		||||
//	"encoding/json"
 | 
			
		||||
//	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
//	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
//	"github.com/openimsdk/tools/log"
 | 
			
		||||
//	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
//	"github.com/openimsdk/tools/utils/idutil"
 | 
			
		||||
//	"sync"
 | 
			
		||||
//	"time"
 | 
			
		||||
//)
 | 
			
		||||
//
 | 
			
		||||
//type subClient struct {
 | 
			
		||||
//	clients map[string]*Client
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func newSubscription() *Subscription {
 | 
			
		||||
//	return &Subscription{
 | 
			
		||||
//		userIDs: make(map[string]*subClient),
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//type Subscription struct {
 | 
			
		||||
//	lock    sync.RWMutex
 | 
			
		||||
//	userIDs map[string]*subClient
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (s *Subscription) GetClient(userID string) []*Client {
 | 
			
		||||
//	s.lock.RLock()
 | 
			
		||||
//	defer s.lock.RUnlock()
 | 
			
		||||
//	cs, ok := s.userIDs[userID]
 | 
			
		||||
//	if !ok {
 | 
			
		||||
//		return nil
 | 
			
		||||
//	}
 | 
			
		||||
//	clients := make([]*Client, 0, len(cs.clients))
 | 
			
		||||
//	for _, client := range cs.clients {
 | 
			
		||||
//		clients = append(clients, client)
 | 
			
		||||
//	}
 | 
			
		||||
//	return clients
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (s *Subscription) DelClient(client *Client) {
 | 
			
		||||
//	client.subLock.Lock()
 | 
			
		||||
//	userIDs := datautil.Keys(client.subUserIDs)
 | 
			
		||||
//	for _, userID := range userIDs {
 | 
			
		||||
//		delete(client.subUserIDs, userID)
 | 
			
		||||
//	}
 | 
			
		||||
//	client.subLock.Unlock()
 | 
			
		||||
//	if len(userIDs) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	addr := client.ctx.GetRemoteAddr()
 | 
			
		||||
//	s.lock.Lock()
 | 
			
		||||
//	defer s.lock.Unlock()
 | 
			
		||||
//	for _, userID := range userIDs {
 | 
			
		||||
//		sub, ok := s.userIDs[userID]
 | 
			
		||||
//		if !ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		delete(sub.clients, addr)
 | 
			
		||||
//		if len(sub.clients) == 0 {
 | 
			
		||||
//			delete(s.userIDs, userID)
 | 
			
		||||
//		}
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | 
			
		||||
//	if len(addUserIDs)+len(delUserIDs) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	var (
 | 
			
		||||
//		del = make(map[string]struct{})
 | 
			
		||||
//		add = make(map[string]struct{})
 | 
			
		||||
//	)
 | 
			
		||||
//	client.subLock.Lock()
 | 
			
		||||
//	for _, userID := range delUserIDs {
 | 
			
		||||
//		if _, ok := client.subUserIDs[userID]; !ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		del[userID] = struct{}{}
 | 
			
		||||
//		delete(client.subUserIDs, userID)
 | 
			
		||||
//	}
 | 
			
		||||
//	for _, userID := range addUserIDs {
 | 
			
		||||
//		delete(del, userID)
 | 
			
		||||
//		if _, ok := client.subUserIDs[userID]; ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		client.subUserIDs[userID] = struct{}{}
 | 
			
		||||
//	}
 | 
			
		||||
//	client.subLock.Unlock()
 | 
			
		||||
//	if len(del)+len(add) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	addr := client.ctx.GetRemoteAddr()
 | 
			
		||||
//	s.lock.Lock()
 | 
			
		||||
//	defer s.lock.Unlock()
 | 
			
		||||
//	for userID := range del {
 | 
			
		||||
//		sub, ok := s.userIDs[userID]
 | 
			
		||||
//		if !ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		delete(sub.clients, addr)
 | 
			
		||||
//		if len(sub.clients) == 0 {
 | 
			
		||||
//			delete(s.userIDs, userID)
 | 
			
		||||
//		}
 | 
			
		||||
//	}
 | 
			
		||||
//	for userID := range add {
 | 
			
		||||
//		sub, ok := s.userIDs[userID]
 | 
			
		||||
//		if !ok {
 | 
			
		||||
//			sub = &subClient{clients: make(map[string]*Client)}
 | 
			
		||||
//			s.userIDs[userID] = sub
 | 
			
		||||
//		}
 | 
			
		||||
//		sub.clients[addr] = client
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) {
 | 
			
		||||
//	clients := ws.subscription.GetClient(userID)
 | 
			
		||||
//	if len(clients) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	msgContent, err := json.Marshal(platformIDs)
 | 
			
		||||
//	if err != nil {
 | 
			
		||||
//		log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	now := time.Now().UnixMilli()
 | 
			
		||||
//	msgID := idutil.GetMsgIDByMD5(userID)
 | 
			
		||||
//	msg := &sdkws.MsgData{
 | 
			
		||||
//		SendID:           userID,
 | 
			
		||||
//		ClientMsgID:      msgID,
 | 
			
		||||
//		ServerMsgID:      msgID,
 | 
			
		||||
//		SenderPlatformID: constant.AdminPlatformID,
 | 
			
		||||
//		SessionType:      constant.NotificationChatType,
 | 
			
		||||
//		ContentType:      constant.UserSubscribeOnlineStatusNotification,
 | 
			
		||||
//		Content:          msgContent,
 | 
			
		||||
//		SendTime:         now,
 | 
			
		||||
//		CreateTime:       now,
 | 
			
		||||
//	}
 | 
			
		||||
//	for _, client := range clients {
 | 
			
		||||
//		msg.RecvID = client.UserID
 | 
			
		||||
//		if err := client.PushMessage(ctx, msg); err != nil {
 | 
			
		||||
//			log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent)
 | 
			
		||||
//		}
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | 
			
		||||
	if len(addUserIDs)+len(delUserIDs) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	var (
 | 
			
		||||
		del = make(map[string]struct{})
 | 
			
		||||
		add = make(map[string]struct{})
 | 
			
		||||
	)
 | 
			
		||||
	client.subLock.Lock()
 | 
			
		||||
	for _, userID := range delUserIDs {
 | 
			
		||||
		if _, ok := client.subUserIDs[userID]; !ok {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		del[userID] = struct{}{}
 | 
			
		||||
		delete(client.subUserIDs, userID)
 | 
			
		||||
	}
 | 
			
		||||
	for _, userID := range addUserIDs {
 | 
			
		||||
		delete(del, userID)
 | 
			
		||||
		if _, ok := client.subUserIDs[userID]; ok {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		client.subUserIDs[userID] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
	client.subLock.Unlock()
 | 
			
		||||
	if len(del)+len(add) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	addr := client.ctx.GetRemoteAddr()
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	for userID := range del {
 | 
			
		||||
		sub, ok := s.userIDs[userID]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		delete(sub.clients, addr)
 | 
			
		||||
		if len(sub.clients) == 0 {
 | 
			
		||||
			delete(s.userIDs, userID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for userID := range add {
 | 
			
		||||
		sub, ok := s.userIDs[userID]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			sub = &subClient{clients: make(map[string]*Client)}
 | 
			
		||||
			s.userIDs[userID] = sub
 | 
			
		||||
		}
 | 
			
		||||
		sub.clients[addr] = client
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) {
 | 
			
		||||
	clients := ws.subscription.GetClient(userID)
 | 
			
		||||
	if len(clients) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	msgContent, err := json.Marshal(platformIDs)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	now := time.Now().UnixMilli()
 | 
			
		||||
	msgID := idutil.GetMsgIDByMD5(userID)
 | 
			
		||||
	msg := &sdkws.MsgData{
 | 
			
		||||
		SendID:           userID,
 | 
			
		||||
		ClientMsgID:      msgID,
 | 
			
		||||
		ServerMsgID:      msgID,
 | 
			
		||||
		SenderPlatformID: constant.AdminPlatformID,
 | 
			
		||||
		SessionType:      constant.NotificationChatType,
 | 
			
		||||
		ContentType:      constant.UserSubscribeOnlineStatusNotification,
 | 
			
		||||
		Content:          msgContent,
 | 
			
		||||
		SendTime:         now,
 | 
			
		||||
		CreateTime:       now,
 | 
			
		||||
	}
 | 
			
		||||
	for _, client := range clients {
 | 
			
		||||
		msg.RecvID = client.UserID
 | 
			
		||||
		if err := client.PushMessage(ctx, msg); err != nil {
 | 
			
		||||
			log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -75,7 +75,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
 | 
			
		||||
	consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
 | 
			
		||||
	consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
 | 
			
		||||
	consumerHandler.config = config
 | 
			
		||||
	consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb)
 | 
			
		||||
	consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
 | 
			
		||||
	return &consumerHandler, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,7 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient) *OnlineCache {
 | 
			
		||||
func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache {
 | 
			
		||||
	x := &OnlineCache{
 | 
			
		||||
		user:  user,
 | 
			
		||||
		group: group,
 | 
			
		||||
@ -33,6 +33,9 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
 | 
			
		||||
			}
 | 
			
		||||
			storageCache := x.setUserOnline(userID, platformIDs)
 | 
			
		||||
			log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache)
 | 
			
		||||
			if fn != nil {
 | 
			
		||||
				fn(ctx, userID, platformIDs)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	return x
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user