mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	Merge branch 'main' of github.com:openimsdk/open-im-server into feat/stress-test
This commit is contained in:
		
						commit
						da6c8b113a
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | |||||||
| 	github.com/gorilla/websocket v1.5.1 | 	github.com/gorilla/websocket v1.5.1 | ||||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||||
| 	github.com/mitchellh/mapstructure v1.5.0 | 	github.com/mitchellh/mapstructure v1.5.0 | ||||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.79 | 	github.com/openimsdk/protocol v0.0.72-alpha.81 | ||||||
| 	github.com/openimsdk/tools v0.0.50-alpha.74 | 	github.com/openimsdk/tools v0.0.50-alpha.74 | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.18.0 | 	github.com/prometheus/client_golang v1.18.0 | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | |||||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||||
| github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= | github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= | ||||||
| github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= | github.com/openimsdk/protocol v0.0.72-alpha.81 h1:6tDuZ3Anfi1uhX/V5mWxITqJnGQPnvgeaxeqJlEHIVE= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | github.com/openimsdk/protocol v0.0.72-alpha.81/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= | github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||||
|  | |||||||
| @ -16,6 +16,7 @@ package api | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/conversation" | 	"github.com/openimsdk/protocol/conversation" | ||||||
| 	"github.com/openimsdk/tools/a2r" | 	"github.com/openimsdk/tools/a2r" | ||||||
| ) | ) | ||||||
| @ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { | |||||||
| func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { | func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { | ||||||
| 	a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) | 	a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { | ||||||
|  | 	a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) | ||||||
|  | } | ||||||
|  | |||||||
| @ -2,10 +2,14 @@ package jssdk | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" |  | ||||||
| 	"sort" | 	"sort" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
|  | 
 | ||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/conversation" | 	"github.com/openimsdk/protocol/conversation" | ||||||
| 	"github.com/openimsdk/protocol/jssdk" | 	"github.com/openimsdk/protocol/jssdk" | ||||||
| 	"github.com/openimsdk/protocol/msg" | 	"github.com/openimsdk/protocol/msg" | ||||||
| @ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 	if len(conversationIDs) == 0 { | 	if len(conversationIDs) == 0 { | ||||||
| 		return &jssdk.GetActiveConversationsResp{}, nil | 		return &jssdk.GetActiveConversationsResp{}, nil | ||||||
| 	} | 	} | ||||||
| 	readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) | 
 | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) | 	activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 	if len(activeConversation) == 0 { | 	if len(activeConversation) == 0 { | ||||||
| 		return &jssdk.GetActiveConversationsResp{}, nil | 		return &jssdk.GetActiveConversationsResp{}, nil | ||||||
| 	} | 	} | ||||||
|  | 	readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
| 	sortConversations := sortActiveConversations{ | 	sortConversations := sortActiveConversations{ | ||||||
| 		Conversation: activeConversation, | 		Conversation: activeConversation, | ||||||
| 	} | 	} | ||||||
| @ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) | ||||||
| 	conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { | 	conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { | ||||||
| 		return c.ConversationID | 		return c.ConversationID | ||||||
| 	}) | 	}) | ||||||
| @ -156,17 +162,16 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 		if !ok { | 		if !ok { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		var lastMsg *sdkws.MsgData |  | ||||||
| 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | ||||||
| 			lastMsg = msgList.Msgs[0] |  | ||||||
| 		} |  | ||||||
| 			resp = append(resp, &jssdk.ConversationMsg{ | 			resp = append(resp, &jssdk.ConversationMsg{ | ||||||
| 				Conversation: conv, | 				Conversation: conv, | ||||||
| 			LastMsg:      lastMsg, | 				LastMsg:      msgList.Msgs[0], | ||||||
| 				MaxSeq:       c.MaxSeq, | 				MaxSeq:       c.MaxSeq, | ||||||
| 				ReadSeq:      readSeq[c.ConversationID], | 				ReadSeq:      readSeq[c.ConversationID], | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 	} | ||||||
| 	if err := x.fillConversations(ctx, resp); err != nil { | 	if err := x.fillConversations(ctx, resp); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -219,19 +224,19 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation | |||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) | ||||||
| 	resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) | 	resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) | ||||||
| 	for _, c := range conversations { | 	for _, c := range conversations { | ||||||
| 		var lastMsg *sdkws.MsgData |  | ||||||
| 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | ||||||
| 			lastMsg = msgList.Msgs[0] |  | ||||||
| 		} |  | ||||||
| 			resp = append(resp, &jssdk.ConversationMsg{ | 			resp = append(resp, &jssdk.ConversationMsg{ | ||||||
| 				Conversation: c, | 				Conversation: c, | ||||||
| 			LastMsg:      lastMsg, | 				LastMsg:      msgList.Msgs[0], | ||||||
| 				MaxSeq:       maxSeqs[c.ConversationID], | 				MaxSeq:       maxSeqs[c.ConversationID], | ||||||
| 				ReadSeq:      readSeqs[c.ConversationID], | 				ReadSeq:      readSeqs[c.ConversationID], | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 	} | ||||||
| 	if err := x.fillConversations(ctx, resp); err != nil { | 	if err := x.fillConversations(ctx, resp); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -247,3 +252,36 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation | |||||||
| 		UnreadCount:   unreadCount, | 		UnreadCount:   unreadCount, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // This function checks whether the latest MaxSeq message is valid. | ||||||
|  | // If not, it needs to fetch a valid message again. | ||||||
|  | func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) { | ||||||
|  | 	var conversationIDs []string | ||||||
|  | 
 | ||||||
|  | 	for conversationID, message := range messages { | ||||||
|  | 		allInValid := true | ||||||
|  | 		for _, data := range message.Msgs { | ||||||
|  | 			if data.Status < constant.MsgStatusHasDeleted { | ||||||
|  | 				allInValid = false | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if allInValid { | ||||||
|  | 			conversationIDs = append(conversationIDs, conversationID) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(conversationIDs) > 0 { | ||||||
|  | 		resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ | ||||||
|  | 			UserID:          userID, | ||||||
|  | 			ConversationIDs: conversationIDs, | ||||||
|  | 		}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		for conversationID, message := range resp.Msgs { | ||||||
|  | 			messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | |||||||
| @ -551,11 +551,3 @@ func (m *MessageApi) SearchMsg(c *gin.Context) { | |||||||
| func (m *MessageApi) GetServerTime(c *gin.Context) { | func (m *MessageApi) GetServerTime(c *gin.Context) { | ||||||
| 	a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) | 	a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) | ||||||
| } | } | ||||||
| 
 |  | ||||||
| func (m *MessageApi) GetStreamMsg(c *gin.Context) { |  | ||||||
| 	a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *MessageApi) AppendStreamMsg(c *gin.Context) { |  | ||||||
| 	a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) |  | ||||||
| } |  | ||||||
|  | |||||||
| @ -9,6 +9,8 @@ import ( | |||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
| 	"github.com/gin-gonic/gin/binding" | 	"github.com/gin-gonic/gin/binding" | ||||||
| 	"github.com/go-playground/validator/v10" | 	"github.com/go-playground/validator/v10" | ||||||
|  | 	clientv3 "go.etcd.io/etcd/client/v3" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" | 	"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| @ -27,7 +29,6 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/discovery/etcd" | 	"github.com/openimsdk/tools/discovery/etcd" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mw" | 	"github.com/openimsdk/tools/mw" | ||||||
| 	clientv3 "go.etcd.io/etcd/client/v3" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| @ -246,8 +247,6 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin | |||||||
| 		msgGroup.POST("/batch_send_msg", m.BatchSendMsg) | 		msgGroup.POST("/batch_send_msg", m.BatchSendMsg) | ||||||
| 		msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) | 		msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) | ||||||
| 		msgGroup.POST("/get_server_time", m.GetServerTime) | 		msgGroup.POST("/get_server_time", m.GetServerTime) | ||||||
| 		msgGroup.POST("/get_stream_msg", m.GetStreamMsg) |  | ||||||
| 		msgGroup.POST("/append_stream_msg", m.AppendStreamMsg) |  | ||||||
| 	} | 	} | ||||||
| 	// Conversation | 	// Conversation | ||||||
| 	{ | 	{ | ||||||
| @ -264,6 +263,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin | |||||||
| 		conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) | 		conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) | ||||||
| 		conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) | 		conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) | ||||||
| 		conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) | 		conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) | ||||||
|  | 		conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	{ | 	{ | ||||||
|  | |||||||
| @ -22,6 +22,8 @@ import ( | |||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" | 	"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||||
| 
 | 
 | ||||||
|  | 	"google.golang.org/grpc" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" | ||||||
| @ -40,7 +42,6 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"google.golang.org/grpc" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type conversationServer struct { | type conversationServer struct { | ||||||
| @ -329,6 +330,19 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver | |||||||
| 	return &pbconversation.SetConversationsResp{}, nil | 	return &pbconversation.SetConversationsResp{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) { | ||||||
|  | 	m := make(map[string]any) | ||||||
|  | 	if req.Ex != nil { | ||||||
|  | 		m["ex"] = req.Ex.Value | ||||||
|  | 	} | ||||||
|  | 	if len(m) > 0 { | ||||||
|  | 		if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return &pbconversation.UpdateConversationsByUserResp{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Get user IDs with "Do Not Disturb" enabled in super large groups. | // Get user IDs with "Do Not Disturb" enabled in super large groups. | ||||||
| func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { | func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { | ||||||
| 	return nil, errs.New("deprecated") | 	return nil, errs.New("deprecated") | ||||||
|  | |||||||
| @ -48,7 +48,3 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv | |||||||
| 	} | 	} | ||||||
| 	m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) | 	m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) | ||||||
| } | } | ||||||
| 
 |  | ||||||
| func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) { |  | ||||||
| 	m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips) |  | ||||||
| } |  | ||||||
|  | |||||||
| @ -17,6 +17,8 @@ package msg | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 
 | 
 | ||||||
|  | 	"google.golang.org/protobuf/proto" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" | 	"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" | ||||||
| @ -29,7 +31,6 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mcontext" | 	"github.com/openimsdk/tools/mcontext" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"google.golang.org/protobuf/proto" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { | func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { | ||||||
| @ -49,11 +50,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg. | |||||||
| 
 | 
 | ||||||
| func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { | func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { | ||||||
| 	m.encapsulateMsgData(req.MsgData) | 	m.encapsulateMsgData(req.MsgData) | ||||||
| 	if req.MsgData.ContentType == constant.Stream { |  | ||||||
| 		if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { |  | ||||||
| 			return nil, err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	switch req.MsgData.SessionType { | 	switch req.MsgData.SessionType { | ||||||
| 	case constant.SingleChatType: | 	case constant.SingleChatType: | ||||||
| 		return m.sendMsgSingleChat(ctx, req, before) | 		return m.sendMsgSingleChat(ctx, req, before) | ||||||
|  | |||||||
| @ -61,7 +61,6 @@ type msgServer struct { | |||||||
| 	msg.UnimplementedMsgServer | 	msg.UnimplementedMsgServer | ||||||
| 	RegisterCenter         discovery.Conn                   // Service discovery registry for service registration. | 	RegisterCenter         discovery.Conn                   // Service discovery registry for service registration. | ||||||
| 	MsgDatabase            controller.CommonMsgDatabase     // Interface for message database operations. | 	MsgDatabase            controller.CommonMsgDatabase     // Interface for message database operations. | ||||||
| 	StreamMsgDatabase      controller.StreamMsgDatabase |  | ||||||
| 	UserLocalCache         *rpccache.UserLocalCache         // Local cache for user data. | 	UserLocalCache         *rpccache.UserLocalCache         // Local cache for user data. | ||||||
| 	FriendLocalCache       *rpccache.FriendLocalCache       // Local cache for friend data. | 	FriendLocalCache       *rpccache.FriendLocalCache       // Local cache for friend data. | ||||||
| 	GroupLocalCache        *rpccache.GroupLocalCache        // Local cache for group data. | 	GroupLocalCache        *rpccache.GroupLocalCache        // Local cache for group data. | ||||||
| @ -117,10 +116,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB()) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) | 	seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) | ||||||
| 	userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) | 	userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -142,7 +137,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr | |||||||
| 	msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, redisProducer) | 	msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, redisProducer) | ||||||
| 	s := &msgServer{ | 	s := &msgServer{ | ||||||
| 		MsgDatabase:            msgDatabase, | 		MsgDatabase:            msgDatabase, | ||||||
| 		StreamMsgDatabase:      controller.NewStreamMsgDatabase(streamMsg), |  | ||||||
| 		RegisterCenter:         client, | 		RegisterCenter:         client, | ||||||
| 		UserLocalCache:         rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb), | 		UserLocalCache:         rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb), | ||||||
| 		GroupLocalCache:        rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb), | 		GroupLocalCache:        rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb), | ||||||
|  | |||||||
| @ -1,115 +0,0 @@ | |||||||
| package msg |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"fmt" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" |  | ||||||
| 	"github.com/openimsdk/protocol/constant" |  | ||||||
| 	"github.com/openimsdk/protocol/msg" |  | ||||||
| 	"github.com/openimsdk/protocol/sdkws" |  | ||||||
| 	"github.com/openimsdk/tools/errs" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const StreamDeadlineTime = time.Second * 60 * 10 |  | ||||||
| 
 |  | ||||||
| func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { |  | ||||||
| 	now := time.Now() |  | ||||||
| 	val := &model.StreamMsg{ |  | ||||||
| 		ClientMsgID:    msgData.ClientMsgID, |  | ||||||
| 		ConversationID: msgprocessor.GetConversationIDByMsg(msgData), |  | ||||||
| 		UserID:         msgData.SendID, |  | ||||||
| 		CreateTime:     now, |  | ||||||
| 		DeadlineTime:   now.Add(StreamDeadlineTime), |  | ||||||
| 	} |  | ||||||
| 	return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { |  | ||||||
| 	res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	now := time.Now() |  | ||||||
| 	if !res.End && res.DeadlineTime.Before(now) { |  | ||||||
| 		res.End = true |  | ||||||
| 		res.DeadlineTime = now |  | ||||||
| 		_ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now) |  | ||||||
| 	} |  | ||||||
| 	return res, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { |  | ||||||
| 	res, err := m.getStreamMsg(ctx, req.ClientMsgID) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	if res.End { |  | ||||||
| 		return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") |  | ||||||
| 	} |  | ||||||
| 	if len(res.Packets) < int(req.StartIndex) { |  | ||||||
| 		return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") |  | ||||||
| 	} |  | ||||||
| 	if val := len(res.Packets) - int(req.StartIndex); val > 0 { |  | ||||||
| 		exist := res.Packets[int(req.StartIndex):] |  | ||||||
| 		for i, s := range exist { |  | ||||||
| 			if len(req.Packets) == 0 { |  | ||||||
| 				break |  | ||||||
| 			} |  | ||||||
| 			if s != req.Packets[i] { |  | ||||||
| 				return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) |  | ||||||
| 			} |  | ||||||
| 			req.StartIndex++ |  | ||||||
| 			req.Packets = req.Packets[1:] |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	if len(req.Packets) == 0 && res.End == req.End { |  | ||||||
| 		return &msg.AppendStreamMsgResp{}, nil |  | ||||||
| 	} |  | ||||||
| 	deadlineTime := time.Now().Add(StreamDeadlineTime) |  | ||||||
| 	if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	tips := &sdkws.StreamMsgTips{ |  | ||||||
| 		ConversationID: res.ConversationID, |  | ||||||
| 		ClientMsgID:    res.ClientMsgID, |  | ||||||
| 		StartIndex:     req.StartIndex, |  | ||||||
| 		Packets:        req.Packets, |  | ||||||
| 		End:            req.End, |  | ||||||
| 	} |  | ||||||
| 	var ( |  | ||||||
| 		recvID      string |  | ||||||
| 		sessionType int32 |  | ||||||
| 	) |  | ||||||
| 	if conversation.GroupID == "" { |  | ||||||
| 		sessionType = constant.SingleChatType |  | ||||||
| 		recvID = conversation.UserID |  | ||||||
| 	} else { |  | ||||||
| 		sessionType = constant.ReadGroupChatType |  | ||||||
| 		recvID = conversation.GroupID |  | ||||||
| 	} |  | ||||||
| 	m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) |  | ||||||
| 	return &msg.AppendStreamMsgResp{}, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { |  | ||||||
| 	res, err := m.getStreamMsg(ctx, req.ClientMsgID) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	return &msg.GetStreamMsgResp{ |  | ||||||
| 		ClientMsgID:    res.ClientMsgID, |  | ||||||
| 		ConversationID: res.ConversationID, |  | ||||||
| 		UserID:         res.UserID, |  | ||||||
| 		Packets:        res.Packets, |  | ||||||
| 		End:            res.End, |  | ||||||
| 		CreateTime:     res.CreateTime.UnixMilli(), |  | ||||||
| 		DeadlineTime:   res.DeadlineTime.UnixMilli(), |  | ||||||
| 	}, nil |  | ||||||
| } |  | ||||||
| @ -46,6 +46,9 @@ type ConversationDatabase interface { | |||||||
| 	// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is | 	// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is | ||||||
| 	// transactional. | 	// transactional. | ||||||
| 	SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error | 	SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error | ||||||
|  | 	// UpdateUserConversations updates all conversations related to a specified user. | ||||||
|  | 	// This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user). | ||||||
|  | 	UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error | ||||||
| 	// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. | 	// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. | ||||||
| 	CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error | 	CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error | ||||||
| 	// GetConversationIDs retrieves conversation IDs for a given user. | 	// GetConversationIDs retrieves conversation IDs for a given user. | ||||||
| @ -145,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error { | ||||||
|  | 	conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	cache := c.cache.CloneConversationCache() | ||||||
|  | 	for _, conversation := range conversations { | ||||||
|  | 		cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID) | ||||||
|  | 	} | ||||||
|  | 	return cache.ChainExecDel(ctx) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { | func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { | ||||||
| 	_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) | 	_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | |||||||
| @ -1,34 +0,0 @@ | |||||||
| package controller |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type StreamMsgDatabase interface { |  | ||||||
| 	CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error |  | ||||||
| 	AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error |  | ||||||
| 	GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase { |  | ||||||
| 	return &streamMsgDatabase{db: db} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type streamMsgDatabase struct { |  | ||||||
| 	db database.StreamMsg |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error { |  | ||||||
| 	return m.db.CreateStreamMsg(ctx, model) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { |  | ||||||
| 	return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { |  | ||||||
| 	return m.db.GetStreamMsg(ctx, clientMsgID) |  | ||||||
| } |  | ||||||
| @ -24,6 +24,7 @@ import ( | |||||||
| type Conversation interface { | type Conversation interface { | ||||||
| 	Create(ctx context.Context, conversations []*model.Conversation) (err error) | 	Create(ctx context.Context, conversations []*model.Conversation) (err error) | ||||||
| 	UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) | 	UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) | ||||||
|  | 	UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) | ||||||
| 	Update(ctx context.Context, conversation *model.Conversation) (err error) | 	Update(ctx context.Context, conversation *model.Conversation) (err error) | ||||||
| 	Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) | 	Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) | ||||||
| 	FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) | 	FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) | ||||||
|  | |||||||
| @ -21,23 +21,32 @@ import ( | |||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 
 | 
 | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo/options" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/tools/db/mongoutil" | 	"github.com/openimsdk/tools/db/mongoutil" | ||||||
| 	"github.com/openimsdk/tools/db/pagination" | 	"github.com/openimsdk/tools/db/pagination" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" |  | ||||||
| 	"go.mongodb.org/mongo-driver/mongo" |  | ||||||
| 	"go.mongodb.org/mongo-driver/mongo/options" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { | func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { | ||||||
| 	coll := db.Collection(database.ConversationName) | 	coll := db.Collection(database.ConversationName) | ||||||
| 	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ | 	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ | ||||||
|  | 		{ | ||||||
| 			Keys: bson.D{ | 			Keys: bson.D{ | ||||||
| 				{Key: "owner_user_id", Value: 1}, | 				{Key: "owner_user_id", Value: 1}, | ||||||
| 				{Key: "conversation_id", Value: 1}, | 				{Key: "conversation_id", Value: 1}, | ||||||
| 			}, | 			}, | ||||||
| 			Options: options.Index().SetUnique(true), | 			Options: options.Index().SetUnique(true), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			Keys: bson.D{ | ||||||
|  | 				{Key: "user_id", Value: 1}, | ||||||
|  | 			}, | ||||||
|  | 			Options: options.Index(), | ||||||
|  | 		}, | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, errs.Wrap(err) | 		return nil, errs.Wrap(err) | ||||||
| @ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con | |||||||
| 	return rows, nil | 	return rows, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) { | ||||||
|  | 	if len(args) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	filter := bson.M{ | ||||||
|  | 		"user_id": userID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1})) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	err = mongoutil.IncrVersion(func() error { | ||||||
|  | 		_, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	}, func() error { | ||||||
|  | 		for _, conversation := range conversations { | ||||||
|  | 			if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return conversations, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { | func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { | ||||||
| 	return mongoutil.IncrVersion(func() error { | 	return mongoutil.IncrVersion(func() error { | ||||||
| 		return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) | 		return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) | ||||||
|  | |||||||
| @ -1,60 +0,0 @@ | |||||||
| package mgo |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" |  | ||||||
| 	"github.com/openimsdk/tools/db/mongoutil" |  | ||||||
| 	"github.com/openimsdk/tools/errs" |  | ||||||
| 	"go.mongodb.org/mongo-driver/bson" |  | ||||||
| 	"go.mongodb.org/mongo-driver/mongo" |  | ||||||
| 	"go.mongodb.org/mongo-driver/mongo/options" |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) { |  | ||||||
| 	coll := db.Collection(database.StreamMsgName) |  | ||||||
| 	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ |  | ||||||
| 		Keys: bson.D{ |  | ||||||
| 			{Key: "client_msg_id", Value: 1}, |  | ||||||
| 		}, |  | ||||||
| 		Options: options.Index().SetUnique(true), |  | ||||||
| 	}) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, errs.Wrap(err) |  | ||||||
| 	} |  | ||||||
| 	return &StreamMsgMongo{coll: coll}, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type StreamMsgMongo struct { |  | ||||||
| 	coll *mongo.Collection |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error { |  | ||||||
| 	if val.Packets == nil { |  | ||||||
| 		val.Packets = []string{} |  | ||||||
| 	} |  | ||||||
| 	return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val}) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { |  | ||||||
| 	update := bson.M{ |  | ||||||
| 		"$set": bson.M{ |  | ||||||
| 			"end":           end, |  | ||||||
| 			"deadline_time": deadlineTime, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	if len(packets) > 0 { |  | ||||||
| 		update["$push"] = bson.M{ |  | ||||||
| 			"packets": bson.M{ |  | ||||||
| 				"$each":     packets, |  | ||||||
| 				"$position": startIndex, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { |  | ||||||
| 	return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID}) |  | ||||||
| } |  | ||||||
| @ -1,13 +0,0 @@ | |||||||
| package database |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type StreamMsg interface { |  | ||||||
| 	CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error |  | ||||||
| 	AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error |  | ||||||
| 	GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) |  | ||||||
| } |  | ||||||
| @ -1,21 +0,0 @@ | |||||||
| package model |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	StreamMsgStatusWait = 0 |  | ||||||
| 	StreamMsgStatusDone = 1 |  | ||||||
| 	StreamMsgStatusFail = 2 |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type StreamMsg struct { |  | ||||||
| 	ClientMsgID    string    `bson:"client_msg_id"` |  | ||||||
| 	ConversationID string    `bson:"conversation_id"` |  | ||||||
| 	UserID         string    `bson:"user_id"` |  | ||||||
| 	Packets        []string  `bson:"packets"` |  | ||||||
| 	End            bool      `bson:"end"` |  | ||||||
| 	CreateTime     time.Time `bson:"create_time"` |  | ||||||
| 	DeadlineTime   time.Time `bson:"deadline_time"` |  | ||||||
| } |  | ||||||
| @ -2,9 +2,11 @@ package rpcli | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 
 | ||||||
|  | 	"google.golang.org/grpc" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/msg" | 	"github.com/openimsdk/protocol/msg" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"google.golang.org/grpc" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { | func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user