mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 13:12:12 +08:00 
			
		
		
		
	Merge branch 'main' into feat/send-ctype
This commit is contained in:
		
						commit
						5b4b2020a1
					
				
							
								
								
									
										84
									
								
								.github/workflows/.github/workflows/update-version-file-on-release.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										84
									
								
								.github/workflows/.github/workflows/update-version-file-on-release.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,84 @@ | ||||
| name: Update Version File on Release | ||||
| 
 | ||||
| on: | ||||
|   release: | ||||
|     types: [created] | ||||
| 
 | ||||
| jobs: | ||||
|   update-version: | ||||
|     runs-on: ubuntu-latest | ||||
|     env: | ||||
|       TAG_VERSION: ${{ github.event.release.tag_name }}  | ||||
|     steps: | ||||
|       # Step 1: Checkout the original repository's code | ||||
|       - name: Checkout code | ||||
|         uses: actions/checkout@v4 | ||||
|         with: | ||||
|           fetch-depth: 0 | ||||
| 
 | ||||
|       # Step 2: Set up Git with official account | ||||
|       - name: Set up Git | ||||
|         run: | | ||||
|           git config user.name "github-actions[bot]" | ||||
|           git config user.email "github-actions[bot]@users.noreply.github.com" | ||||
| 
 | ||||
|       # Step 3: Check and delete existing tag | ||||
|       - name: Check and delete existing tag | ||||
|         run: | | ||||
|           if git rev-parse ${{ env.TAG_VERSION }} >/dev/null 2>&1; then | ||||
|             git tag -d ${{ env.TAG_VERSION }} | ||||
|             git push --delete origin ${{ env.TAG_VERSION }} | ||||
|           fi | ||||
| 
 | ||||
|       # Step 4: Update version file | ||||
|       - name: Update version file | ||||
|         run: | | ||||
|           echo "${{ env.TAG_VERSION }}" > version/version | ||||
| 
 | ||||
|       # Step 5: Commit and push changes | ||||
|       - name: Commit and push changes | ||||
|         env: | ||||
|           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||||
|         run: | | ||||
|           git add version/version | ||||
|           git commit -m "Update version to ${{ env.TAG_VERSION }}" | ||||
|           git push origin HEAD:${{ github.ref }} | ||||
| 
 | ||||
|       # Step 6: Create and push tag | ||||
|       - name: Create and push tag | ||||
|         env: | ||||
|           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||||
|         run: | | ||||
|           git tag ${{ env.TAG_VERSION }} | ||||
|           git push origin ${{ env.TAG_VERSION }} | ||||
| 
 | ||||
|       # Step 7: Find and Publish Draft Release | ||||
|       - name: Find and Publish Draft Release | ||||
|         uses: actions/github-script@v6 | ||||
|         with: | ||||
|           github-token: ${{ secrets.GITHUB_TOKEN }} | ||||
|           script: | | ||||
|             // Get the list of releases | ||||
|             const releases = await github.rest.repos.listReleases({ | ||||
|               owner: context.repo.owner, | ||||
|               repo: context.repo.repo | ||||
|             }); | ||||
| 
 | ||||
|             // Find the draft release where the title and tag_name are the same | ||||
|             const draftRelease = releases.data.find(release =>  | ||||
|               release.draft && release.name === release.tag_name | ||||
|             ); | ||||
| 
 | ||||
|             if (draftRelease) { | ||||
|               // Publish the draft release using the release_id | ||||
|               await github.rest.repos.updateRelease({ | ||||
|                 owner: context.repo.owner, | ||||
|                 repo: context.repo.repo, | ||||
|                 release_id: draftRelease.id,  // Use release_id | ||||
|                 draft: false | ||||
|               }); | ||||
| 
 | ||||
|               core.info(`Draft Release ${draftRelease.tag_name} published successfully.`); | ||||
|             } else { | ||||
|               core.info("No matching draft release found."); | ||||
|             } | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | ||||
| 	github.com/gorilla/websocket v1.5.1 | ||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||
| 	github.com/mitchellh/mapstructure v1.5.0 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.51 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.54 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.16 | ||||
| 	github.com/pkg/errors v0.9.1 // indirect | ||||
| 	github.com/prometheus/client_golang v1.18.0 | ||||
|  | ||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | ||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||
| github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= | ||||
| github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.51 h1:G5Yjndp/FRyOJWhoQcSF2x2GvYiAIlqN0vjkvjUPycU= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.51/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.54 h1:opato7N4QjjRq/SHD54bDSVBpOEEDp1VLWVk5Os2A9s= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.54/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||
|  | ||||
| @ -175,6 +175,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM | ||||
| 		data = apistruct.CustomElem{} | ||||
| 	case constant.Quote: | ||||
| 		data = apistruct.QuoteElem{} | ||||
| 	case constant.Stream: | ||||
| 		data = apistruct.StreamMsgElem{} | ||||
| 	case constant.OANotification: | ||||
| 		data = apistruct.OANotificationElem{} | ||||
| 		req.SessionType = constant.NotificationChatType | ||||
| @ -375,3 +377,11 @@ func (m *MessageApi) SearchMsg(c *gin.Context) { | ||||
| func (m *MessageApi) GetServerTime(c *gin.Context) { | ||||
| 	a2r.Call(msg.MsgClient.GetServerTime, m.Client, c) | ||||
| } | ||||
| 
 | ||||
| func (m *MessageApi) GetStreamMsg(c *gin.Context) { | ||||
| 	a2r.Call(msg.MsgClient.GetStreamMsg, m.Client, c) | ||||
| } | ||||
| 
 | ||||
| func (m *MessageApi) AppendStreamMsg(c *gin.Context) { | ||||
| 	a2r.Call(msg.MsgClient.AppendStreamMsg, m.Client, c) | ||||
| } | ||||
|  | ||||
| @ -222,6 +222,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En | ||||
| 		msgGroup.POST("/batch_send_msg", m.BatchSendMsg) | ||||
| 		msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) | ||||
| 		msgGroup.POST("/get_server_time", m.GetServerTime) | ||||
| 		msgGroup.POST("/get_stream_msg", m.GetStreamMsg) | ||||
| 		msgGroup.POST("/append_stream_msg", m.AppendStreamMsg) | ||||
| 	} | ||||
| 	// Conversation | ||||
| 	conversationGroup := r.Group("/conversation") | ||||
|  | ||||
| @ -327,11 +327,6 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien | ||||
| 
 | ||||
| 	switch ws.msgGatewayConfig.Share.MultiLogin.Policy { | ||||
| 	case constant.DefalutNotKick: | ||||
| 	case constant.WebAndOther: | ||||
| 		if constant.PlatformIDToClass(newClient.PlatformID) == constant.WebPlatformStr { | ||||
| 			return | ||||
| 		} | ||||
| 		fallthrough | ||||
| 	case constant.PCAndOther: | ||||
| 		if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { | ||||
| 			return | ||||
| @ -356,7 +351,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien | ||||
| 			log.ZWarn(newClient.ctx, "InvalidateToken err", err, "userID", newClient.UserID, | ||||
| 				"platformID", newClient.PlatformID) | ||||
| 		} | ||||
| 	case constant.PcMobileAndWeb: | ||||
| 	case constant.AllLoginButSameClassKick: | ||||
| 		clients, ok := ws.clients.GetAll(newClient.UserID) | ||||
| 		if !ok { | ||||
| 			return | ||||
| @ -370,21 +365,6 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien | ||||
| 			} | ||||
| 		} | ||||
| 		kickTokenFunc(kickClients) | ||||
| 
 | ||||
| 	case constant.SingleTerminalLogin: | ||||
| 		clients, ok := ws.clients.GetAll(newClient.UserID) | ||||
| 		if !ok { | ||||
| 			return | ||||
| 		} | ||||
| 		var ( | ||||
| 			kickClients []*Client | ||||
| 		) | ||||
| 		for _, client := range clients { | ||||
| 			kickClients = append(kickClients, client) | ||||
| 		} | ||||
| 		kickTokenFunc(kickClients) | ||||
| 	case constant.Customize: | ||||
| 		// todo | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -261,27 +261,35 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver | ||||
| 
 | ||||
| 	setConversationFieldsFunc := func() { | ||||
| 		if req.Conversation.RecvMsgOpt != nil { | ||||
| 			conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value | ||||
| 			m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value | ||||
| 		} | ||||
| 		if req.Conversation.AttachedInfo != nil { | ||||
| 			conversation.AttachedInfo = req.Conversation.AttachedInfo.Value | ||||
| 			m["attached_info"] = req.Conversation.AttachedInfo.Value | ||||
| 		} | ||||
| 		if req.Conversation.Ex != nil { | ||||
| 			conversation.Ex = req.Conversation.Ex.Value | ||||
| 			m["ex"] = req.Conversation.Ex.Value | ||||
| 		} | ||||
| 		if req.Conversation.IsPinned != nil { | ||||
| 			conversation.IsPinned = req.Conversation.IsPinned.Value | ||||
| 			m["is_pinned"] = req.Conversation.IsPinned.Value | ||||
| 		} | ||||
| 		if req.Conversation.GroupAtType != nil { | ||||
| 			conversation.GroupAtType = req.Conversation.GroupAtType.Value | ||||
| 			m["group_at_type"] = req.Conversation.GroupAtType.Value | ||||
| 		} | ||||
| 		if req.Conversation.MsgDestructTime != nil { | ||||
| 			conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value | ||||
| 			m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value | ||||
| 		} | ||||
| 		if req.Conversation.IsMsgDestruct != nil { | ||||
| 			conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value | ||||
| 			m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value | ||||
| 		} | ||||
| 		if req.Conversation.BurnDuration != nil { | ||||
| 			conversation.BurnDuration = req.Conversation.BurnDuration.Value | ||||
| 			m["burn_duration"] = req.Conversation.BurnDuration.Value | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @ -48,3 +48,7 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv | ||||
| 	} | ||||
| 	m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) | ||||
| } | ||||
| 
 | ||||
| func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) { | ||||
| 	m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips) | ||||
| } | ||||
|  | ||||
| @ -34,6 +34,11 @@ import ( | ||||
| func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { | ||||
| 	if req.MsgData != nil { | ||||
| 		m.encapsulateMsgData(req.MsgData) | ||||
| 		if req.MsgData.ContentType == constant.Stream { | ||||
| 			if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 		} | ||||
| 		switch req.MsgData.SessionType { | ||||
| 		case constant.SingleChatType: | ||||
| 			return m.sendMsgSingleChat(ctx, req) | ||||
|  | ||||
| @ -42,8 +42,9 @@ type ( | ||||
| 
 | ||||
| 	// MsgServer encapsulates dependencies required for message handling. | ||||
| 	msgServer struct { | ||||
| 		RegisterCenter         discovery.SvcDiscoveryRegistry   // Service discovery registry for service registration. | ||||
| 		MsgDatabase            controller.CommonMsgDatabase     // Interface for message database operations. | ||||
| 		RegisterCenter         discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. | ||||
| 		MsgDatabase            controller.CommonMsgDatabase   // Interface for message database operations. | ||||
| 		StreamMsgDatabase      controller.StreamMsgDatabase | ||||
| 		Conversation           *rpcclient.ConversationRpcClient // RPC client for conversation service. | ||||
| 		UserLocalCache         *rpccache.UserLocalCache         // Local cache for user data. | ||||
| 		FriendLocalCache       *rpccache.FriendLocalCache       // Local cache for friend data. | ||||
| @ -101,6 +102,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB()) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) | ||||
| 	msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) | ||||
| 	if err != nil { | ||||
| @ -109,6 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | ||||
| 	s := &msgServer{ | ||||
| 		Conversation:           &conversationClient, | ||||
| 		MsgDatabase:            msgDatabase, | ||||
| 		StreamMsgDatabase:      controller.NewStreamMsgDatabase(streamMsg), | ||||
| 		RegisterCenter:         client, | ||||
| 		UserLocalCache:         rpccache.NewUserLocalCache(userRpcClient, &config.LocalCacheConfig, rdb), | ||||
| 		GroupLocalCache:        rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb), | ||||
|  | ||||
							
								
								
									
										114
									
								
								internal/rpc/msg/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										114
									
								
								internal/rpc/msg/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,114 @@ | ||||
| package msg | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	"github.com/openimsdk/protocol/msg" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| const StreamDeadlineTime = time.Second * 60 * 10 | ||||
| 
 | ||||
| func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { | ||||
| 	now := time.Now() | ||||
| 	val := &model.StreamMsg{ | ||||
| 		ClientMsgID:    msgData.ClientMsgID, | ||||
| 		ConversationID: msgprocessor.GetConversationIDByMsg(msgData), | ||||
| 		UserID:         msgData.SendID, | ||||
| 		CreateTime:     now, | ||||
| 		DeadlineTime:   now.Add(StreamDeadlineTime), | ||||
| 	} | ||||
| 	return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) | ||||
| } | ||||
| 
 | ||||
| func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { | ||||
| 	res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	now := time.Now() | ||||
| 	if !res.End && res.DeadlineTime.Before(now) { | ||||
| 		res.End = true | ||||
| 		res.DeadlineTime = now | ||||
| 		_ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now) | ||||
| 	} | ||||
| 	return res, nil | ||||
| } | ||||
| 
 | ||||
| func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { | ||||
| 	res, err := m.getStreamMsg(ctx, req.ClientMsgID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if res.End { | ||||
| 		return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") | ||||
| 	} | ||||
| 	if len(res.Packets) < int(req.StartIndex) { | ||||
| 		return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") | ||||
| 	} | ||||
| 	if val := len(res.Packets) - int(req.StartIndex); val > 0 { | ||||
| 		exist := res.Packets[int(req.StartIndex):] | ||||
| 		for i, s := range exist { | ||||
| 			if len(req.Packets) == 0 { | ||||
| 				break | ||||
| 			} | ||||
| 			if s != req.Packets[i] { | ||||
| 				return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) | ||||
| 			} | ||||
| 			req.StartIndex++ | ||||
| 			req.Packets = req.Packets[1:] | ||||
| 		} | ||||
| 	} | ||||
| 	if len(req.Packets) == 0 && res.End == req.End { | ||||
| 		return &msg.AppendStreamMsgResp{}, nil | ||||
| 	} | ||||
| 	deadlineTime := time.Now().Add(StreamDeadlineTime) | ||||
| 	if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	conversation, err := m.Conversation.GetConversation(ctx, res.UserID, res.ConversationID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	tips := &sdkws.StreamMsgTips{ | ||||
| 		ConversationID: res.ConversationID, | ||||
| 		ClientMsgID:    res.ClientMsgID, | ||||
| 		StartIndex:     req.StartIndex, | ||||
| 		Packets:        req.Packets, | ||||
| 		End:            req.End, | ||||
| 	} | ||||
| 	var ( | ||||
| 		recvID      string | ||||
| 		sessionType int32 | ||||
| 	) | ||||
| 	if conversation.GroupID == "" { | ||||
| 		sessionType = constant.SingleChatType | ||||
| 		recvID = conversation.UserID | ||||
| 	} else { | ||||
| 		sessionType = constant.ReadGroupChatType | ||||
| 		recvID = conversation.GroupID | ||||
| 	} | ||||
| 	m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) | ||||
| 	return &msg.AppendStreamMsgResp{}, nil | ||||
| } | ||||
| 
 | ||||
| func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { | ||||
| 	res, err := m.getStreamMsg(ctx, req.ClientMsgID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &msg.GetStreamMsgResp{ | ||||
| 		ClientMsgID:    res.ClientMsgID, | ||||
| 		ConversationID: res.ConversationID, | ||||
| 		UserID:         res.UserID, | ||||
| 		Packets:        res.Packets, | ||||
| 		End:            res.End, | ||||
| 		CreateTime:     res.CreateTime.UnixMilli(), | ||||
| 		DeadlineTime:   res.DeadlineTime.UnixMilli(), | ||||
| 	}, nil | ||||
| } | ||||
| @ -83,6 +83,11 @@ type TextElem struct { | ||||
| 	Content string `json:"content" validate:"required"` | ||||
| } | ||||
| 
 | ||||
| type StreamMsgElem struct { | ||||
| 	Type    string `mapstructure:"type" validate:"required"` | ||||
| 	Content string `mapstructure:"content" validate:"required"` | ||||
| } | ||||
| 
 | ||||
| type RevokeElem struct { | ||||
| 	RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` | ||||
| } | ||||
|  | ||||
							
								
								
									
										14
									
								
								pkg/common/storage/cache/redis/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										14
									
								
								pkg/common/storage/cache/redis/msg.go
									
									
									
									
										vendored
									
									
								
							| @ -1,17 +1,3 @@ | ||||
| // Copyright © 2023 OpenIM. All rights reserved. | ||||
| // | ||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| // you may not use this file except in compliance with the License. | ||||
| // You may obtain a copy of the License at | ||||
| // | ||||
| //     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, software | ||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| // See the License for the specific language governing permissions and | ||||
| // limitations under the License. | ||||
| 
 | ||||
| package redis | ||||
| 
 | ||||
| import ( | ||||
|  | ||||
| @ -172,17 +172,8 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string | ||||
| 				kickToken = append(kickToken, ts[len(ts)-1]) | ||||
| 			} | ||||
| 		} | ||||
| 	case constant.SingleTerminalLogin: | ||||
| 		for _, ts := range loginTokenMap { | ||||
| 			kickToken = append(kickToken, ts...) | ||||
| 		} | ||||
| 	case constant.WebAndOther: | ||||
| 		unkickTerminal = constant.WebPlatformStr | ||||
| 		fallthrough | ||||
| 	case constant.PCAndOther: | ||||
| 		if unkickTerminal == "" { | ||||
| 			unkickTerminal = constant.TerminalPC | ||||
| 		} | ||||
| 		unkickTerminal = constant.TerminalPC | ||||
| 		if constant.PlatformIDToClass(platformID) != unkickTerminal { | ||||
| 			for plt, ts := range loginTokenMap { | ||||
| 				if constant.PlatformIDToClass(plt) != unkickTerminal { | ||||
| @ -214,17 +205,17 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	case constant.PcMobileAndWeb: | ||||
| 	case constant.AllLoginButSameClassKick: | ||||
| 		var ( | ||||
| 			reserved = make(map[string]bool) | ||||
| 			reserved = make(map[string]struct{}) | ||||
| 		) | ||||
| 
 | ||||
| 		for plt, ts := range loginTokenMap { | ||||
| 			if constant.PlatformIDToClass(plt) == constant.PlatformIDToClass(platformID) { | ||||
| 				kickToken = append(kickToken, ts...) | ||||
| 			} else { | ||||
| 				if !reserved[constant.PlatformIDToClass(plt)] { | ||||
| 					reserved[constant.PlatformIDToClass(plt)] = true | ||||
| 				if _, ok := reserved[constant.PlatformIDToClass(plt)]; !ok { | ||||
| 					reserved[constant.PlatformIDToClass(plt)] = struct{}{} | ||||
| 					kickToken = append(kickToken, ts[:len(ts)-1]...) | ||||
| 					continue | ||||
| 				} else { | ||||
| @ -232,22 +223,6 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 	case constant.Customize: | ||||
| 		if a.multiLogin.CustomizeLoginNum[platformID] <= 0 { | ||||
| 			return nil, nil, errs.New("Do not allow login on this end").Wrap() | ||||
| 		} | ||||
| 		for plt, ts := range loginTokenMap { | ||||
| 			l := len(ts) | ||||
| 			if platformID == plt { | ||||
| 				l++ | ||||
| 			} | ||||
| 			// a.multiLogin.CustomizeLoginNum[platformID] must > 0 | ||||
| 			limit := min(a.multiLogin.CustomizeLoginNum[plt], a.multiLogin.MaxNumOneEnd) | ||||
| 			if l > limit { | ||||
| 				kickToken = append(kickToken, ts[:l-limit]...) | ||||
| 			} | ||||
| 		} | ||||
| 	default: | ||||
| 		return nil, nil, errs.New("unknown multiLogin policy").Wrap() | ||||
| 	} | ||||
|  | ||||
| @ -443,6 +443,11 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin | ||||
| 			return 0, 0, nil, err | ||||
| 		} | ||||
| 		successMsgs = append(mongoMsgs, successMsgs...) | ||||
| 
 | ||||
| 		_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) | ||||
| 		if err != nil { | ||||
| 			return 0, 0, nil, err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return minSeq, maxSeq, successMsgs, nil | ||||
| @ -500,6 +505,11 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co | ||||
| 		} | ||||
| 
 | ||||
| 		successMsgs = append(successMsgs, mongoMsgs...) | ||||
| 
 | ||||
| 		_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) | ||||
| 		if err != nil { | ||||
| 			return 0, 0, nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	return minSeq, maxSeq, successMsgs, nil | ||||
| } | ||||
|  | ||||
							
								
								
									
										34
									
								
								pkg/common/storage/controller/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								pkg/common/storage/controller/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,34 @@ | ||||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type StreamMsgDatabase interface { | ||||
| 	CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error | ||||
| 	AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error | ||||
| 	GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) | ||||
| } | ||||
| 
 | ||||
| func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase { | ||||
| 	return &streamMsgDatabase{db: db} | ||||
| } | ||||
| 
 | ||||
| type streamMsgDatabase struct { | ||||
| 	db database.StreamMsg | ||||
| } | ||||
| 
 | ||||
| func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error { | ||||
| 	return m.db.CreateStreamMsg(ctx, model) | ||||
| } | ||||
| 
 | ||||
| func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { | ||||
| 	return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime) | ||||
| } | ||||
| 
 | ||||
| func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { | ||||
| 	return m.db.GetStreamMsg(ctx, clientMsgID) | ||||
| } | ||||
| @ -76,7 +76,7 @@ func (g *GroupMgo) Take(ctx context.Context, groupID string) (group *model.Group | ||||
| 
 | ||||
| func (g *GroupMgo) Search(ctx context.Context, keyword string, pagination pagination.Pagination) (total int64, groups []*model.Group, err error) { | ||||
| 	// Define the sorting options | ||||
| 	opts := options.Find().SetSort(bson.D{{Key: "created_at", Value: -1}}) | ||||
| 	opts := options.Find().SetSort(bson.D{{Key: "create_time", Value: -1}}) | ||||
| 
 | ||||
| 	// Perform the search with pagination and sorting | ||||
| 	return mongoutil.FindPage[*model.Group](ctx, g.coll, bson.M{ | ||||
|  | ||||
							
								
								
									
										60
									
								
								pkg/common/storage/database/mgo/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								pkg/common/storage/database/mgo/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,60 @@ | ||||
| package mgo | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"github.com/openimsdk/tools/db/mongoutil" | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/mongo" | ||||
| 	"go.mongodb.org/mongo-driver/mongo/options" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) { | ||||
| 	coll := db.Collection(database.StreamMsgName) | ||||
| 	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ | ||||
| 		Keys: bson.D{ | ||||
| 			{Key: "client_msg_id", Value: 1}, | ||||
| 		}, | ||||
| 		Options: options.Index().SetUnique(true), | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, errs.Wrap(err) | ||||
| 	} | ||||
| 	return &StreamMsgMongo{coll: coll}, nil | ||||
| } | ||||
| 
 | ||||
| type StreamMsgMongo struct { | ||||
| 	coll *mongo.Collection | ||||
| } | ||||
| 
 | ||||
| func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error { | ||||
| 	if val.Packets == nil { | ||||
| 		val.Packets = []string{} | ||||
| 	} | ||||
| 	return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val}) | ||||
| } | ||||
| 
 | ||||
| func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { | ||||
| 	update := bson.M{ | ||||
| 		"$set": bson.M{ | ||||
| 			"end":           end, | ||||
| 			"deadline_time": deadlineTime, | ||||
| 		}, | ||||
| 	} | ||||
| 	if len(packets) > 0 { | ||||
| 		update["$push"] = bson.M{ | ||||
| 			"packets": bson.M{ | ||||
| 				"$each":     packets, | ||||
| 				"$position": startIndex, | ||||
| 			}, | ||||
| 		} | ||||
| 	} | ||||
| 	return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true) | ||||
| } | ||||
| 
 | ||||
| func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { | ||||
| 	return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID}) | ||||
| } | ||||
| @ -17,4 +17,5 @@ const ( | ||||
| 	UserName                = "user" | ||||
| 	SeqConversationName     = "seq" | ||||
| 	SeqUserName             = "seq_user" | ||||
| 	StreamMsgName           = "stream_msg" | ||||
| ) | ||||
|  | ||||
							
								
								
									
										13
									
								
								pkg/common/storage/database/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								pkg/common/storage/database/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,13 @@ | ||||
| package database | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type StreamMsg interface { | ||||
| 	CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error | ||||
| 	AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error | ||||
| 	GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) | ||||
| } | ||||
							
								
								
									
										21
									
								
								pkg/common/storage/model/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								pkg/common/storage/model/stream_msg.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,21 @@ | ||||
| package model | ||||
| 
 | ||||
| import ( | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	StreamMsgStatusWait = 0 | ||||
| 	StreamMsgStatusDone = 1 | ||||
| 	StreamMsgStatusFail = 2 | ||||
| ) | ||||
| 
 | ||||
| type StreamMsg struct { | ||||
| 	ClientMsgID    string    `bson:"client_msg_id"` | ||||
| 	ConversationID string    `bson:"conversation_id"` | ||||
| 	UserID         string    `bson:"user_id"` | ||||
| 	Packets        []string  `bson:"packets"` | ||||
| 	End            bool      `bson:"end"` | ||||
| 	CreateTime     time.Time `bson:"create_time"` | ||||
| 	DeadlineTime   time.Time `bson:"deadline_time"` | ||||
| } | ||||
							
								
								
									
										161
									
								
								tools/streammsg/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										161
									
								
								tools/streammsg/main.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,161 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"github.com/gin-gonic/gin" | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||
| 	cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" | ||||
| 	"github.com/openimsdk/protocol/auth" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	"github.com/openimsdk/protocol/msg" | ||||
| 	"github.com/openimsdk/tools/apiresp" | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	getAdminToken   = "/auth/get_admin_token" | ||||
| 	sendMsgApi      = "/msg/send_msg" | ||||
| 	appendStreamMsg = "/msg/append_stream_msg" | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	ApiAddr = "http://127.0.0.1:10002" | ||||
| 	Token   string | ||||
| ) | ||||
| 
 | ||||
| func ApiCall[R any](api string, req any) (*R, error) { | ||||
| 	data, err := json.Marshal(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) | ||||
| 	defer cancel() | ||||
| 	request, err := http.NewRequestWithContext(ctx, http.MethodPost, ApiAddr+api, bytes.NewBuffer(data)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if Token != "" { | ||||
| 		request.Header.Set("token", Token) | ||||
| 	} | ||||
| 	request.Header.Set(constant.OperationID, uuid.New().String()) | ||||
| 	response, err := http.DefaultClient.Do(request) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer response.Body.Close() | ||||
| 	var resp R | ||||
| 	apiResponse := apiresp.ApiResponse{ | ||||
| 		Data: &resp, | ||||
| 	} | ||||
| 	if err := json.NewDecoder(response.Body).Decode(&apiResponse); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if apiResponse.ErrCode != 0 { | ||||
| 		return nil, errs.NewCodeError(apiResponse.ErrCode, apiResponse.ErrMsg) | ||||
| 	} | ||||
| 	return &resp, nil | ||||
| } | ||||
| 
 | ||||
| func main() { | ||||
| 	resp, err := ApiCall[auth.GetAdminTokenResp](getAdminToken, &auth.GetAdminTokenReq{ | ||||
| 		Secret: "openIM123", | ||||
| 		UserID: "imAdmin", | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("get admin token failed", err) | ||||
| 		return | ||||
| 	} | ||||
| 	Token = resp.Token | ||||
| 	g := gin.Default() | ||||
| 	g.POST("/callbackExample/callbackAfterSendSingleMsgCommand", toGin(handlerUserMsg)) | ||||
| 	if err := g.Run(":10006"); err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func toGin[R any](fn func(c *gin.Context, req *R) error) gin.HandlerFunc { | ||||
| 	return func(c *gin.Context) { | ||||
| 		body, err := io.ReadAll(c.Request.Body) | ||||
| 		if err != nil { | ||||
| 			c.String(http.StatusInternalServerError, err.Error()) | ||||
| 			return | ||||
| 		} | ||||
| 		fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body) | ||||
| 		var req R | ||||
| 		if err := json.Unmarshal(body, &req); err != nil { | ||||
| 			c.String(http.StatusInternalServerError, err.Error()) | ||||
| 			return | ||||
| 		} | ||||
| 		if err := fn(c, &req); err != nil { | ||||
| 			c.String(http.StatusInternalServerError, err.Error()) | ||||
| 			return | ||||
| 		} | ||||
| 		c.String(http.StatusOK, "{}") | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func handlerUserMsg(c *gin.Context, req *cbapi.CallbackAfterSendSingleMsgReq) error { | ||||
| 	if req.ContentType != constant.Text { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if !strings.Contains(req.Content, "stream") { | ||||
| 		return nil | ||||
| 	} | ||||
| 	apiReq := apistruct.SendMsgReq{ | ||||
| 		RecvID: req.SendID, | ||||
| 		SendMsg: apistruct.SendMsg{ | ||||
| 			SendID:           req.RecvID, | ||||
| 			SenderNickname:   "xxx", | ||||
| 			SenderFaceURL:    "", | ||||
| 			SenderPlatformID: constant.AdminPlatformID, | ||||
| 			ContentType:      constant.Stream, | ||||
| 			SessionType:      req.SessionType, | ||||
| 			SendTime:         time.Now().UnixMilli(), | ||||
| 			Content: map[string]any{ | ||||
| 				"type":    "xxx", | ||||
| 				"content": "server test stream msg", | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	go func() { | ||||
| 		if err := doPushStreamMsg(&apiReq); err != nil { | ||||
| 			fmt.Println("doPushStreamMsg failed", err) | ||||
| 			return | ||||
| 		} | ||||
| 		fmt.Println("doPushStreamMsg success") | ||||
| 	}() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func doPushStreamMsg(sendReq *apistruct.SendMsgReq) error { | ||||
| 	resp, err := ApiCall[msg.SendMsgResp](sendMsgApi, sendReq) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	const num = 5 | ||||
| 	for i := 1; i <= num; i++ { | ||||
| 		_, err := ApiCall[msg.AppendStreamMsgResp](appendStreamMsg, &msg.AppendStreamMsgReq{ | ||||
| 			ClientMsgID: resp.ClientMsgID, | ||||
| 			StartIndex:  int64(i - 1), | ||||
| 			Packets: []string{ | ||||
| 				fmt.Sprintf("stream_msg_packet_%03d", i), | ||||
| 			}, | ||||
| 			End: i == num, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			fmt.Println("append stream msg failed", "clientMsgID", resp.ClientMsgID, "index", fmt.Sprintf("%d/%d", i, num), "error", err) | ||||
| 			return err | ||||
| 		} | ||||
| 		fmt.Println("append stream msg success", "clientMsgID", resp.ClientMsgID, "index", fmt.Sprintf("%d/%d", i, num)) | ||||
| 		time.Sleep(time.Second * 10) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user