mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	Merge branch 'build/script' of github.com:mo3et/open-im-server into build/script
This commit is contained in:
		
						commit
						87df7c2f23
					
				| @ -22,5 +22,3 @@ longConnSvr: | |||||||
|   websocketMaxMsgLen: 4096 |   websocketMaxMsgLen: 4096 | ||||||
|   # WebSocket connection handshake timeout in seconds |   # WebSocket connection handshake timeout in seconds | ||||||
|   websocketTimeout: 10 |   websocketTimeout: 10 | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
|  | |||||||
| @ -34,8 +34,8 @@ jpns: | |||||||
| 
 | 
 | ||||||
| # iOS system push sound and badge count | # iOS system push sound and badge count | ||||||
| iosPush: | iosPush: | ||||||
|       pushSound: xxx |   pushSound: xxx | ||||||
|       badgeCount: true |   badgeCount: true | ||||||
|       production: false |   production: false | ||||||
| 
 | 
 | ||||||
| fullUserCache: true | fullUserCache: true | ||||||
|  | |||||||
| @ -16,6 +16,7 @@ package msggateway | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"runtime/debug" | 	"runtime/debug" | ||||||
| 	"sync" | 	"sync" | ||||||
| @ -69,6 +70,8 @@ type Client struct { | |||||||
| 	IsCompress     bool   `json:"isCompress"` | 	IsCompress     bool   `json:"isCompress"` | ||||||
| 	UserID         string `json:"userID"` | 	UserID         string `json:"userID"` | ||||||
| 	IsBackground   bool   `json:"isBackground"` | 	IsBackground   bool   `json:"isBackground"` | ||||||
|  | 	SDKType        string `json:"sdkType"` | ||||||
|  | 	Encoder        Encoder | ||||||
| 	ctx            *UserConnContext | 	ctx            *UserConnContext | ||||||
| 	longConnServer LongConnServer | 	longConnServer LongConnServer | ||||||
| 	closed         atomic.Bool | 	closed         atomic.Bool | ||||||
| @ -94,11 +97,17 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer | |||||||
| 	c.closed.Store(false) | 	c.closed.Store(false) | ||||||
| 	c.closedErr = nil | 	c.closedErr = nil | ||||||
| 	c.token = ctx.GetToken() | 	c.token = ctx.GetToken() | ||||||
|  | 	c.SDKType = ctx.GetSDKType() | ||||||
| 	c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) | 	c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) | ||||||
| 	c.subLock = new(sync.Mutex) | 	c.subLock = new(sync.Mutex) | ||||||
| 	if c.subUserIDs != nil { | 	if c.subUserIDs != nil { | ||||||
| 		clear(c.subUserIDs) | 		clear(c.subUserIDs) | ||||||
| 	} | 	} | ||||||
|  | 	if c.SDKType == GoSDK { | ||||||
|  | 		c.Encoder = NewGobEncoder() | ||||||
|  | 	} else { | ||||||
|  | 		c.Encoder = NewJsonEncoder() | ||||||
|  | 	} | ||||||
| 	c.subUserIDs = make(map[string]struct{}) | 	c.subUserIDs = make(map[string]struct{}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -159,9 +168,12 @@ func (c *Client) readMessage() { | |||||||
| 				return | 				return | ||||||
| 			} | 			} | ||||||
| 		case MessageText: | 		case MessageText: | ||||||
| 			c.closedErr = ErrNotSupportMessageProtocol | 			_ = c.conn.SetReadDeadline(pongWait) | ||||||
| 			return | 			parseDataErr := c.handlerTextMessage(message) | ||||||
| 
 | 			if parseDataErr != nil { | ||||||
|  | 				c.closedErr = parseDataErr | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
| 		case PingMessage: | 		case PingMessage: | ||||||
| 			err := c.writePongMsg("") | 			err := c.writePongMsg("") | ||||||
| 			log.ZError(c.ctx, "writePongMsg", err) | 			log.ZError(c.ctx, "writePongMsg", err) | ||||||
| @ -188,7 +200,7 @@ func (c *Client) handleMessage(message []byte) error { | |||||||
| 	var binaryReq = getReq() | 	var binaryReq = getReq() | ||||||
| 	defer freeReq(binaryReq) | 	defer freeReq(binaryReq) | ||||||
| 
 | 
 | ||||||
| 	err := c.longConnServer.Decode(message, binaryReq) | 	err := c.Encoder.Decode(message, binaryReq) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @ -335,7 +347,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error { | |||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	encodedBuf, err := c.longConnServer.Encode(resp) | 	encodedBuf, err := c.Encoder.Encode(resp) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @ -419,3 +431,26 @@ func (c *Client) writePongMsg(appData string) error { | |||||||
| 
 | 
 | ||||||
| 	return errs.Wrap(err) | 	return errs.Wrap(err) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *Client) handlerTextMessage(b []byte) error { | ||||||
|  | 	var msg TextMessage | ||||||
|  | 	if err := json.Unmarshal(b, &msg); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	switch msg.Type { | ||||||
|  | 	case TextPong: | ||||||
|  | 		return nil | ||||||
|  | 	case TextPing: | ||||||
|  | 		msg.Type = TextPong | ||||||
|  | 		msgData, err := json.Marshal(msg) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		if err := c.conn.SetWriteDeadline(writeWait); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		return c.conn.WriteMessage(MessageText, msgData) | ||||||
|  | 	default: | ||||||
|  | 		return fmt.Errorf("not support message type %s", msg.Type) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
| @ -27,6 +27,12 @@ const ( | |||||||
| 	GzipCompressionProtocol = "gzip" | 	GzipCompressionProtocol = "gzip" | ||||||
| 	BackgroundStatus        = "isBackground" | 	BackgroundStatus        = "isBackground" | ||||||
| 	SendResponse            = "isMsgResp" | 	SendResponse            = "isMsgResp" | ||||||
|  | 	SDKType                 = "sdkType" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	GoSDK = "go" | ||||||
|  | 	JsSDK = "js" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
|  | |||||||
| @ -153,6 +153,14 @@ func (c *UserConnContext) GetCompression() bool { | |||||||
| 	return false | 	return false | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *UserConnContext) GetSDKType() string { | ||||||
|  | 	sdkType := c.Req.URL.Query().Get(SDKType) | ||||||
|  | 	if sdkType == "" { | ||||||
|  | 		sdkType = GoSDK | ||||||
|  | 	} | ||||||
|  | 	return sdkType | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *UserConnContext) ShouldSendResp() bool { | func (c *UserConnContext) ShouldSendResp() bool { | ||||||
| 	errResp, exists := c.Query(SendResponse) | 	errResp, exists := c.Query(SendResponse) | ||||||
| 	if exists { | 	if exists { | ||||||
| @ -193,7 +201,11 @@ func (c *UserConnContext) ParseEssentialArgs() error { | |||||||
| 	_, err := strconv.Atoi(platformIDStr) | 	_, err := strconv.Atoi(platformIDStr) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int") | 		return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int") | ||||||
| 
 | 	} | ||||||
|  | 	switch sdkType, _ := c.Query(SDKType); sdkType { | ||||||
|  | 	case "", GoSDK, JsSDK: | ||||||
|  | 	default: | ||||||
|  | 		return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js") | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | |||||||
| @ -17,6 +17,7 @@ package msggateway | |||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"encoding/gob" | 	"encoding/gob" | ||||||
|  | 	"encoding/json" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| ) | ) | ||||||
| @ -28,12 +29,12 @@ type Encoder interface { | |||||||
| 
 | 
 | ||||||
| type GobEncoder struct{} | type GobEncoder struct{} | ||||||
| 
 | 
 | ||||||
| func NewGobEncoder() *GobEncoder { | func NewGobEncoder() Encoder { | ||||||
| 	return &GobEncoder{} | 	return GobEncoder{} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (g *GobEncoder) Encode(data any) ([]byte, error) { | func (g GobEncoder) Encode(data any) ([]byte, error) { | ||||||
| 	buff := bytes.Buffer{} | 	var buff bytes.Buffer | ||||||
| 	enc := gob.NewEncoder(&buff) | 	enc := gob.NewEncoder(&buff) | ||||||
| 	if err := enc.Encode(data); err != nil { | 	if err := enc.Encode(data); err != nil { | ||||||
| 		return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode") | 		return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode") | ||||||
| @ -41,7 +42,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) { | |||||||
| 	return buff.Bytes(), nil | 	return buff.Bytes(), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { | func (g GobEncoder) Decode(encodeData []byte, decodeData any) error { | ||||||
| 	buff := bytes.NewBuffer(encodeData) | 	buff := bytes.NewBuffer(encodeData) | ||||||
| 	dec := gob.NewDecoder(buff) | 	dec := gob.NewDecoder(buff) | ||||||
| 	if err := dec.Decode(decodeData); err != nil { | 	if err := dec.Decode(decodeData); err != nil { | ||||||
| @ -49,3 +50,25 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { | |||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | type JsonEncoder struct{} | ||||||
|  | 
 | ||||||
|  | func NewJsonEncoder() Encoder { | ||||||
|  | 	return JsonEncoder{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (g JsonEncoder) Encode(data any) ([]byte, error) { | ||||||
|  | 	b, err := json.Marshal(data) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, errs.New("JsonEncoder.Encode failed", "action", "encode") | ||||||
|  | 	} | ||||||
|  | 	return b, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error { | ||||||
|  | 	err := json.Unmarshal(encodeData, decodeData) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return errs.New("JsonEncoder.Decode failed", "action", "decode") | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f | |||||||
| 	return s | 	return s | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) OnlinePushMsg( | func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) { | ||||||
| 	context context.Context, |  | ||||||
| 	req *msggateway.OnlinePushMsgReq, |  | ||||||
| ) (*msggateway.OnlinePushMsgResp, error) { |  | ||||||
| 	panic("implement me") | 	panic("implement me") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) GetUsersOnlineStatus( | func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) { | ||||||
| 	ctx context.Context, |  | ||||||
| 	req *msggateway.GetUsersOnlineStatusReq, |  | ||||||
| ) (*msggateway.GetUsersOnlineStatusResp, error) { |  | ||||||
| 	if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) { | 	if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) { | ||||||
| 		return nil, errs.ErrNoPermission.WrapMsg("only app manager") | 		return nil, errs.ErrNoPermission.WrapMsg("only app manager") | ||||||
| 	} | 	} | ||||||
| @ -155,6 +149,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M | |||||||
| 			(client.IsBackground && client.PlatformID != constant.IOSPlatformID) { | 			(client.IsBackground && client.PlatformID != constant.IOSPlatformID) { | ||||||
| 			err := client.PushMessage(ctx, msgData) | 			err := client.PushMessage(ctx, msgData) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
|  | 				log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID) | ||||||
| 				userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) | 				userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) | ||||||
| 			} else { | 			} else { | ||||||
| 				if _, ok := s.pushTerminal[client.PlatformID]; ok { | 				if _, ok := s.pushTerminal[client.PlatformID]; ok { | ||||||
| @ -220,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) KickUserOffline( | func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) { | ||||||
| 	ctx context.Context, |  | ||||||
| 	req *msggateway.KickUserOfflineReq, |  | ||||||
| ) (*msggateway.KickUserOfflineResp, error) { |  | ||||||
| 	for _, v := range req.KickUserIDList { | 	for _, v := range req.KickUserIDList { | ||||||
| 		clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) | 		clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) | ||||||
| 		if !ok { | 		if !ok { | ||||||
|  | |||||||
| @ -16,6 +16,7 @@ package msggateway | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
| 	"sync" | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/go-playground/validator/v10" | 	"github.com/go-playground/validator/v10" | ||||||
| @ -31,6 +32,16 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/utils/jsonutil" | 	"github.com/openimsdk/tools/utils/jsonutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | const ( | ||||||
|  | 	TextPing = "ping" | ||||||
|  | 	TextPong = "pong" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type TextMessage struct { | ||||||
|  | 	Type string          `json:"type"` | ||||||
|  | 	Body json.RawMessage `json:"body"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type Req struct { | type Req struct { | ||||||
| 	ReqIdentifier int32  `json:"reqIdentifier" validate:"required"` | 	ReqIdentifier int32  `json:"reqIdentifier" validate:"required"` | ||||||
| 	Token         string `json:"token"` | 	Token         string `json:"token"` | ||||||
|  | |||||||
| @ -37,7 +37,6 @@ type LongConnServer interface { | |||||||
| 	SetKickHandlerInfo(i *kickHandler) | 	SetKickHandlerInfo(i *kickHandler) | ||||||
| 	SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) | 	SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) | ||||||
| 	Compressor | 	Compressor | ||||||
| 	Encoder |  | ||||||
| 	MessageHandler | 	MessageHandler | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -61,7 +60,7 @@ type WsServer struct { | |||||||
| 	authClient        *rpcclient.Auth | 	authClient        *rpcclient.Auth | ||||||
| 	disCov            discovery.SvcDiscoveryRegistry | 	disCov            discovery.SvcDiscoveryRegistry | ||||||
| 	Compressor | 	Compressor | ||||||
| 	Encoder | 	//Encoder | ||||||
| 	MessageHandler | 	MessageHandler | ||||||
| 	webhookClient *webhook.Client | 	webhookClient *webhook.Client | ||||||
| } | } | ||||||
| @ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { | |||||||
| 		clients:         newUserMap(), | 		clients:         newUserMap(), | ||||||
| 		subscription:    newSubscription(), | 		subscription:    newSubscription(), | ||||||
| 		Compressor:      NewGzipCompressor(), | 		Compressor:      NewGzipCompressor(), | ||||||
| 		Encoder:         NewGobEncoder(), |  | ||||||
| 		webhookClient:   webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), | 		webhookClient:   webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) { | |||||||
| 
 | 
 | ||||||
| 	wg.Wait() | 	wg.Wait() | ||||||
| 
 | 
 | ||||||
| 	log.ZDebug( | 	log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load()) | ||||||
| 		client.ctx, |  | ||||||
| 		"user online", |  | ||||||
| 		"online user Num", |  | ||||||
| 		ws.onlineUserNum.Load(), |  | ||||||
| 		"online user conn Num", |  | ||||||
| 		ws.onlineUserConnNum.Load(), |  | ||||||
| 	) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func getRemoteAdders(client []*Client) string { | func getRemoteAdders(client []*Client) string { | ||||||
|  | |||||||
| @ -2,15 +2,14 @@ package controller | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" |  | ||||||
| 	"github.com/openimsdk/tools/log" |  | ||||||
| 
 |  | ||||||
| 	"github.com/golang-jwt/jwt/v4" | 	"github.com/golang-jwt/jwt/v4" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/tokenverify" | 	"github.com/openimsdk/tools/tokenverify" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user