From 9f0201b823a7b91989fae95ea3deb77dfb47620b Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Thu, 13 Mar 2025 15:39:42 +0800 Subject: [PATCH] feat: add a function for business info change to update related conversation's ex info. --- go.mod | 2 +- go.sum | 4 +- internal/api/msg.go | 8 -- internal/api/router.go | 2 - internal/rpc/msg/send.go | 8 +- internal/rpc/msg/stream_msg.go | 115 ------------------ pkg/common/storage/controller/stream_msg.go | 34 ------ pkg/common/storage/database/mgo/stream_msg.go | 60 --------- pkg/common/storage/database/stream_msg.go | 13 -- pkg/common/storage/model/stream_msg.go | 21 ---- 10 files changed, 5 insertions(+), 262 deletions(-) delete mode 100644 internal/rpc/msg/stream_msg.go delete mode 100644 pkg/common/storage/controller/stream_msg.go delete mode 100644 pkg/common/storage/database/mgo/stream_msg.go delete mode 100644 pkg/common/storage/database/stream_msg.go delete mode 100644 pkg/common/storage/model/stream_msg.go diff --git a/go.mod b/go.mod index 7925eb0ff..d762f9fae 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.80 + github.com/openimsdk/protocol v0.0.72-alpha.81 github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index e90f768b8..ea5a6e5b7 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.80 h1:30v74YZVF012YKAKTIy7HfOk/W5mILm+HpJtxwPPCPM= -github.com/openimsdk/protocol v0.0.72-alpha.80/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/protocol v0.0.72-alpha.81 h1:6tDuZ3Anfi1uhX/V5mWxITqJnGQPnvgeaxeqJlEHIVE= +github.com/openimsdk/protocol v0.0.72-alpha.81/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/msg.go b/internal/api/msg.go index 1ec1f44a7..090f3329b 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -551,11 +551,3 @@ func (m *MessageApi) SearchMsg(c *gin.Context) { func (m *MessageApi) GetServerTime(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) } - -func (m *MessageApi) GetStreamMsg(c *gin.Context) { - a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) -} - -func (m *MessageApi) AppendStreamMsg(c *gin.Context) { - a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) -} diff --git a/internal/api/router.go b/internal/api/router.go index 65ae67da1..657493b23 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -247,8 +247,6 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin msgGroup.POST("/batch_send_msg", m.BatchSendMsg) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) msgGroup.POST("/get_server_time", m.GetServerTime) - msgGroup.POST("/get_stream_msg", m.GetStreamMsg) - msgGroup.POST("/append_stream_msg", m.AppendStreamMsg) } // Conversation { diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f226c4921..6b2ec30b5 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -17,6 +17,8 @@ package msg import ( "context" + "google.golang.org/protobuf/proto" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" @@ -29,7 +31,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/protobuf/proto" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { @@ -49,11 +50,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg. func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { m.encapsulateMsgData(req.MsgData) - if req.MsgData.ContentType == constant.Stream { - if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { - return nil, err - } - } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req, before) diff --git a/internal/rpc/msg/stream_msg.go b/internal/rpc/msg/stream_msg.go deleted file mode 100644 index 688d766c8..000000000 --- a/internal/rpc/msg/stream_msg.go +++ /dev/null @@ -1,115 +0,0 @@ -package msg - -import ( - "context" - "fmt" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/errs" -) - -const StreamDeadlineTime = time.Second * 60 * 10 - -func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { - now := time.Now() - val := &model.StreamMsg{ - ClientMsgID: msgData.ClientMsgID, - ConversationID: msgprocessor.GetConversationIDByMsg(msgData), - UserID: msgData.SendID, - CreateTime: now, - DeadlineTime: now.Add(StreamDeadlineTime), - } - return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) -} - -func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) - if err != nil { - return nil, err - } - now := time.Now() - if !res.End && res.DeadlineTime.Before(now) { - res.End = true - res.DeadlineTime = now - _ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now) - } - return res, nil -} - -func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { - res, err := m.getStreamMsg(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - if res.End { - return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") - } - if len(res.Packets) < int(req.StartIndex) { - return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") - } - if val := len(res.Packets) - int(req.StartIndex); val > 0 { - exist := res.Packets[int(req.StartIndex):] - for i, s := range exist { - if len(req.Packets) == 0 { - break - } - if s != req.Packets[i] { - return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) - } - req.StartIndex++ - req.Packets = req.Packets[1:] - } - } - if len(req.Packets) == 0 && res.End == req.End { - return &msg.AppendStreamMsgResp{}, nil - } - deadlineTime := time.Now().Add(StreamDeadlineTime) - if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { - return nil, err - } - conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID) - if err != nil { - return nil, err - } - tips := &sdkws.StreamMsgTips{ - ConversationID: res.ConversationID, - ClientMsgID: res.ClientMsgID, - StartIndex: req.StartIndex, - Packets: req.Packets, - End: req.End, - } - var ( - recvID string - sessionType int32 - ) - if conversation.GroupID == "" { - sessionType = constant.SingleChatType - recvID = conversation.UserID - } else { - sessionType = constant.ReadGroupChatType - recvID = conversation.GroupID - } - m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) - return &msg.AppendStreamMsgResp{}, nil -} - -func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { - res, err := m.getStreamMsg(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - return &msg.GetStreamMsgResp{ - ClientMsgID: res.ClientMsgID, - ConversationID: res.ConversationID, - UserID: res.UserID, - Packets: res.Packets, - End: res.End, - CreateTime: res.CreateTime.UnixMilli(), - DeadlineTime: res.DeadlineTime.UnixMilli(), - }, nil -} diff --git a/pkg/common/storage/controller/stream_msg.go b/pkg/common/storage/controller/stream_msg.go deleted file mode 100644 index 3409ccd93..000000000 --- a/pkg/common/storage/controller/stream_msg.go +++ /dev/null @@ -1,34 +0,0 @@ -package controller - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" -) - -type StreamMsgDatabase interface { - CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error - AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error - GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) -} - -func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase { - return &streamMsgDatabase{db: db} -} - -type streamMsgDatabase struct { - db database.StreamMsg -} - -func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error { - return m.db.CreateStreamMsg(ctx, model) -} - -func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { - return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime) -} - -func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - return m.db.GetStreamMsg(ctx, clientMsgID) -} diff --git a/pkg/common/storage/database/mgo/stream_msg.go b/pkg/common/storage/database/mgo/stream_msg.go deleted file mode 100644 index c57798daa..000000000 --- a/pkg/common/storage/database/mgo/stream_msg.go +++ /dev/null @@ -1,60 +0,0 @@ -package mgo - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "time" -) - -func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) { - coll := db.Collection(database.StreamMsgName) - _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ - Keys: bson.D{ - {Key: "client_msg_id", Value: 1}, - }, - Options: options.Index().SetUnique(true), - }) - if err != nil { - return nil, errs.Wrap(err) - } - return &StreamMsgMongo{coll: coll}, nil -} - -type StreamMsgMongo struct { - coll *mongo.Collection -} - -func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error { - if val.Packets == nil { - val.Packets = []string{} - } - return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val}) -} - -func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { - update := bson.M{ - "$set": bson.M{ - "end": end, - "deadline_time": deadlineTime, - }, - } - if len(packets) > 0 { - update["$push"] = bson.M{ - "packets": bson.M{ - "$each": packets, - "$position": startIndex, - }, - } - } - return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true) -} - -func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID}) -} diff --git a/pkg/common/storage/database/stream_msg.go b/pkg/common/storage/database/stream_msg.go deleted file mode 100644 index e83fffbaa..000000000 --- a/pkg/common/storage/database/stream_msg.go +++ /dev/null @@ -1,13 +0,0 @@ -package database - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" -) - -type StreamMsg interface { - CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error - AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error - GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) -} diff --git a/pkg/common/storage/model/stream_msg.go b/pkg/common/storage/model/stream_msg.go deleted file mode 100644 index c040426a4..000000000 --- a/pkg/common/storage/model/stream_msg.go +++ /dev/null @@ -1,21 +0,0 @@ -package model - -import ( - "time" -) - -const ( - StreamMsgStatusWait = 0 - StreamMsgStatusDone = 1 - StreamMsgStatusFail = 2 -) - -type StreamMsg struct { - ClientMsgID string `bson:"client_msg_id"` - ConversationID string `bson:"conversation_id"` - UserID string `bson:"user_id"` - Packets []string `bson:"packets"` - End bool `bson:"end"` - CreateTime time.Time `bson:"create_time"` - DeadlineTime time.Time `bson:"deadline_time"` -}