mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	online push
This commit is contained in:
		
							parent
							
								
									006766cd14
								
							
						
					
					
						commit
						fcda73f4bc
					
				@ -25,5 +25,4 @@ func main() {
 | 
			
		||||
	if err := cmd.NewApiCmd().Exec(); err != nil {
 | 
			
		||||
		program.ExitWithError(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@ -13,7 +13,7 @@ require (
 | 
			
		||||
	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 | 
			
		||||
	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
 | 
			
		||||
	github.com/mitchellh/mapstructure v1.5.0
 | 
			
		||||
	github.com/openimsdk/protocol v0.0.69-alpha.20
 | 
			
		||||
	github.com/openimsdk/protocol v0.0.69-alpha.24
 | 
			
		||||
	github.com/openimsdk/tools v0.0.49-alpha.25
 | 
			
		||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
			
		||||
	github.com/prometheus/client_golang v1.18.0
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 | 
			
		||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.20 h1:skZu82sqoMhiQVEZgrRsjcfI3Grp1IpThx1LJPqETWs=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.20/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
			
		||||
 | 
			
		||||
@ -72,6 +72,8 @@ type Client struct {
 | 
			
		||||
	closed         atomic.Bool
 | 
			
		||||
	closedErr      error
 | 
			
		||||
	token          string
 | 
			
		||||
	//subLock        sync.Mutex
 | 
			
		||||
	//subUserIDs     map[string]struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ResetClient updates the client's state with new connection and context information.
 | 
			
		||||
 | 
			
		||||
@ -54,13 +54,14 @@ type LongConnServer interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type WsServer struct {
 | 
			
		||||
	msgGatewayConfig  *Config
 | 
			
		||||
	port              int
 | 
			
		||||
	wsMaxConnNum      int64
 | 
			
		||||
	registerChan      chan *Client
 | 
			
		||||
	unregisterChan    chan *Client
 | 
			
		||||
	kickHandlerChan   chan *kickHandler
 | 
			
		||||
	clients           UserMap
 | 
			
		||||
	msgGatewayConfig *Config
 | 
			
		||||
	port             int
 | 
			
		||||
	wsMaxConnNum     int64
 | 
			
		||||
	registerChan     chan *Client
 | 
			
		||||
	unregisterChan   chan *Client
 | 
			
		||||
	kickHandlerChan  chan *kickHandler
 | 
			
		||||
	clients          UserMap
 | 
			
		||||
	//subscription      *Subscription
 | 
			
		||||
	clientPool        sync.Pool
 | 
			
		||||
	onlineUserNum     atomic.Int64
 | 
			
		||||
	onlineUserConnNum atomic.Int64
 | 
			
		||||
@ -141,9 +142,10 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
 | 
			
		||||
		kickHandlerChan: make(chan *kickHandler, 1000),
 | 
			
		||||
		validate:        v,
 | 
			
		||||
		clients:         newUserMap(),
 | 
			
		||||
		Compressor:      NewGzipCompressor(),
 | 
			
		||||
		Encoder:         NewGobEncoder(),
 | 
			
		||||
		webhookClient:   webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
 | 
			
		||||
		//subscription:    newSubscription(),
 | 
			
		||||
		Compressor:    NewGzipCompressor(),
 | 
			
		||||
		Encoder:       NewGobEncoder(),
 | 
			
		||||
		webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										148
									
								
								internal/msggateway/subscription.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										148
									
								
								internal/msggateway/subscription.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,148 @@
 | 
			
		||||
package msggateway
 | 
			
		||||
 | 
			
		||||
//import (
 | 
			
		||||
//	"context"
 | 
			
		||||
//	"encoding/json"
 | 
			
		||||
//	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
//	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
//	"github.com/openimsdk/tools/log"
 | 
			
		||||
//	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
//	"github.com/openimsdk/tools/utils/idutil"
 | 
			
		||||
//	"sync"
 | 
			
		||||
//	"time"
 | 
			
		||||
//)
 | 
			
		||||
//
 | 
			
		||||
//type subClient struct {
 | 
			
		||||
//	clients map[string]*Client
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func newSubscription() *Subscription {
 | 
			
		||||
//	return &Subscription{
 | 
			
		||||
//		userIDs: make(map[string]*subClient),
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//type Subscription struct {
 | 
			
		||||
//	lock    sync.RWMutex
 | 
			
		||||
//	userIDs map[string]*subClient
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (s *Subscription) GetClient(userID string) []*Client {
 | 
			
		||||
//	s.lock.RLock()
 | 
			
		||||
//	defer s.lock.RUnlock()
 | 
			
		||||
//	cs, ok := s.userIDs[userID]
 | 
			
		||||
//	if !ok {
 | 
			
		||||
//		return nil
 | 
			
		||||
//	}
 | 
			
		||||
//	clients := make([]*Client, 0, len(cs.clients))
 | 
			
		||||
//	for _, client := range cs.clients {
 | 
			
		||||
//		clients = append(clients, client)
 | 
			
		||||
//	}
 | 
			
		||||
//	return clients
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (s *Subscription) DelClient(client *Client) {
 | 
			
		||||
//	client.subLock.Lock()
 | 
			
		||||
//	userIDs := datautil.Keys(client.subUserIDs)
 | 
			
		||||
//	for _, userID := range userIDs {
 | 
			
		||||
//		delete(client.subUserIDs, userID)
 | 
			
		||||
//	}
 | 
			
		||||
//	client.subLock.Unlock()
 | 
			
		||||
//	if len(userIDs) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	addr := client.ctx.GetRemoteAddr()
 | 
			
		||||
//	s.lock.Lock()
 | 
			
		||||
//	defer s.lock.Unlock()
 | 
			
		||||
//	for _, userID := range userIDs {
 | 
			
		||||
//		sub, ok := s.userIDs[userID]
 | 
			
		||||
//		if !ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		delete(sub.clients, addr)
 | 
			
		||||
//		if len(sub.clients) == 0 {
 | 
			
		||||
//			delete(s.userIDs, userID)
 | 
			
		||||
//		}
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | 
			
		||||
//	if len(addUserIDs)+len(delUserIDs) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	var (
 | 
			
		||||
//		del = make(map[string]struct{})
 | 
			
		||||
//		add = make(map[string]struct{})
 | 
			
		||||
//	)
 | 
			
		||||
//	client.subLock.Lock()
 | 
			
		||||
//	for _, userID := range delUserIDs {
 | 
			
		||||
//		if _, ok := client.subUserIDs[userID]; !ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		del[userID] = struct{}{}
 | 
			
		||||
//		delete(client.subUserIDs, userID)
 | 
			
		||||
//	}
 | 
			
		||||
//	for _, userID := range addUserIDs {
 | 
			
		||||
//		delete(del, userID)
 | 
			
		||||
//		if _, ok := client.subUserIDs[userID]; ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		client.subUserIDs[userID] = struct{}{}
 | 
			
		||||
//	}
 | 
			
		||||
//	client.subLock.Unlock()
 | 
			
		||||
//	if len(del)+len(add) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	addr := client.ctx.GetRemoteAddr()
 | 
			
		||||
//	s.lock.Lock()
 | 
			
		||||
//	defer s.lock.Unlock()
 | 
			
		||||
//	for userID := range del {
 | 
			
		||||
//		sub, ok := s.userIDs[userID]
 | 
			
		||||
//		if !ok {
 | 
			
		||||
//			continue
 | 
			
		||||
//		}
 | 
			
		||||
//		delete(sub.clients, addr)
 | 
			
		||||
//		if len(sub.clients) == 0 {
 | 
			
		||||
//			delete(s.userIDs, userID)
 | 
			
		||||
//		}
 | 
			
		||||
//	}
 | 
			
		||||
//	for userID := range add {
 | 
			
		||||
//		sub, ok := s.userIDs[userID]
 | 
			
		||||
//		if !ok {
 | 
			
		||||
//			sub = &subClient{clients: make(map[string]*Client)}
 | 
			
		||||
//			s.userIDs[userID] = sub
 | 
			
		||||
//		}
 | 
			
		||||
//		sub.clients[addr] = client
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
//
 | 
			
		||||
//func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) {
 | 
			
		||||
//	clients := ws.subscription.GetClient(userID)
 | 
			
		||||
//	if len(clients) == 0 {
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	msgContent, err := json.Marshal(platformIDs)
 | 
			
		||||
//	if err != nil {
 | 
			
		||||
//		log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
 | 
			
		||||
//		return
 | 
			
		||||
//	}
 | 
			
		||||
//	now := time.Now().UnixMilli()
 | 
			
		||||
//	msgID := idutil.GetMsgIDByMD5(userID)
 | 
			
		||||
//	msg := &sdkws.MsgData{
 | 
			
		||||
//		SendID:           userID,
 | 
			
		||||
//		ClientMsgID:      msgID,
 | 
			
		||||
//		ServerMsgID:      msgID,
 | 
			
		||||
//		SenderPlatformID: constant.AdminPlatformID,
 | 
			
		||||
//		SessionType:      constant.NotificationChatType,
 | 
			
		||||
//		ContentType:      constant.UserSubscribeOnlineStatusNotification,
 | 
			
		||||
//		Content:          msgContent,
 | 
			
		||||
//		SendTime:         now,
 | 
			
		||||
//		CreateTime:       now,
 | 
			
		||||
//	}
 | 
			
		||||
//	for _, client := range clients {
 | 
			
		||||
//		msg.RecvID = client.UserID
 | 
			
		||||
//		if err := client.PushMessage(ctx, msg); err != nil {
 | 
			
		||||
//			log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent)
 | 
			
		||||
//		}
 | 
			
		||||
//	}
 | 
			
		||||
//}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user