From 11044eac586ee2bec8870cc638cb0688631cfdb5 Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Fri, 21 Mar 2025 15:10:31 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20add=20a=20function=20for=20business?= =?UTF-8?q?=20info=20change=20to=20update=20related=20conve=E2=80=A6=20(#3?= =?UTF-8?q?225)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. --- go.mod | 2 +- go.sum | 4 +- internal/api/conversation.go | 5 + internal/api/msg.go | 8 -- internal/api/router.go | 6 +- internal/rpc/conversation/conversation.go | 16 ++- internal/rpc/msg/notification.go | 4 - internal/rpc/msg/send.go | 8 +- internal/rpc/msg/server.go | 10 +- internal/rpc/msg/stream_msg.go | 115 ------------------ pkg/common/storage/controller/conversation.go | 15 +++ pkg/common/storage/controller/stream_msg.go | 34 ------ pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 57 +++++++-- pkg/common/storage/database/mgo/stream_msg.go | 60 --------- pkg/common/storage/database/stream_msg.go | 13 -- pkg/common/storage/model/stream_msg.go | 21 ---- 17 files changed, 95 insertions(+), 284 deletions(-) delete mode 100644 internal/rpc/msg/stream_msg.go delete mode 100644 pkg/common/storage/controller/stream_msg.go delete mode 100644 pkg/common/storage/database/mgo/stream_msg.go delete mode 100644 pkg/common/storage/database/stream_msg.go delete mode 100644 pkg/common/storage/model/stream_msg.go diff --git a/go.mod b/go.mod index 7bf9e6ef6..d762f9fae 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.79 + github.com/openimsdk/protocol v0.0.72-alpha.81 github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 2a86d97ea..ea5a6e5b7 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= -github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/protocol v0.0.72-alpha.81 h1:6tDuZ3Anfi1uhX/V5mWxITqJnGQPnvgeaxeqJlEHIVE= +github.com/openimsdk/protocol v0.0.72-alpha.81/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index f7dbc133c..f6eadf15a 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -16,6 +16,7 @@ package api import ( "github.com/gin-gonic/gin" + "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/a2r" ) @@ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) } + +func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) +} diff --git a/internal/api/msg.go b/internal/api/msg.go index 1ec1f44a7..090f3329b 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -551,11 +551,3 @@ func (m *MessageApi) SearchMsg(c *gin.Context) { func (m *MessageApi) GetServerTime(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) } - -func (m *MessageApi) GetStreamMsg(c *gin.Context) { - a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) -} - -func (m *MessageApi) AppendStreamMsg(c *gin.Context) { - a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) -} diff --git a/internal/api/router.go b/internal/api/router.go index 850c23972..657493b23 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -9,6 +9,8 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -27,7 +29,6 @@ import ( "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" - clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -246,8 +247,6 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin msgGroup.POST("/batch_send_msg", m.BatchSendMsg) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) msgGroup.POST("/get_server_time", m.GetServerTime) - msgGroup.POST("/get_stream_msg", m.GetStreamMsg) - msgGroup.POST("/append_stream_msg", m.AppendStreamMsg) } // Conversation { @@ -264,6 +263,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) } { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 8bc3fd2a9..ca2c58878 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -22,6 +22,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "google.golang.org/grpc" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -40,7 +42,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" ) type conversationServer struct { @@ -329,6 +330,19 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return &pbconversation.SetConversationsResp{}, nil } +func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) { + m := make(map[string]any) + if req.Ex != nil { + m["ex"] = req.Ex.Value + } + if len(m) > 0 { + if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil { + return nil, err + } + } + return &pbconversation.UpdateConversationsByUserResp{}, nil +} + // Get user IDs with "Do Not Disturb" enabled in super large groups. func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { return nil, errs.New("deprecated") diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index 0daafbe6c..0418823d6 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -48,7 +48,3 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv } m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) } - -func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) { - m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips) -} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f226c4921..6b2ec30b5 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -17,6 +17,8 @@ package msg import ( "context" + "google.golang.org/protobuf/proto" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" @@ -29,7 +31,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/protobuf/proto" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { @@ -49,11 +50,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg. func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { m.encapsulateMsgData(req.MsgData) - if req.MsgData.ContentType == constant.Stream { - if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { - return nil, err - } - } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req, before) diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index d1002cca3..cfc750c5b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -59,9 +59,8 @@ type Config struct { // MsgServer encapsulates dependencies required for message handling. type msgServer struct { msg.UnimplementedMsgServer - RegisterCenter discovery.Conn // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. - StreamMsgDatabase controller.StreamMsgDatabase + RegisterCenter discovery.Conn // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. @@ -117,10 +116,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB()) - if err != nil { - return err - } seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { @@ -142,7 +137,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, redisProducer) s := &msgServer{ MsgDatabase: msgDatabase, - StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg), RegisterCenter: client, UserLocalCache: rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb), GroupLocalCache: rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb), diff --git a/internal/rpc/msg/stream_msg.go b/internal/rpc/msg/stream_msg.go deleted file mode 100644 index 688d766c8..000000000 --- a/internal/rpc/msg/stream_msg.go +++ /dev/null @@ -1,115 +0,0 @@ -package msg - -import ( - "context" - "fmt" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/errs" -) - -const StreamDeadlineTime = time.Second * 60 * 10 - -func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { - now := time.Now() - val := &model.StreamMsg{ - ClientMsgID: msgData.ClientMsgID, - ConversationID: msgprocessor.GetConversationIDByMsg(msgData), - UserID: msgData.SendID, - CreateTime: now, - DeadlineTime: now.Add(StreamDeadlineTime), - } - return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) -} - -func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) - if err != nil { - return nil, err - } - now := time.Now() - if !res.End && res.DeadlineTime.Before(now) { - res.End = true - res.DeadlineTime = now - _ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now) - } - return res, nil -} - -func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { - res, err := m.getStreamMsg(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - if res.End { - return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") - } - if len(res.Packets) < int(req.StartIndex) { - return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") - } - if val := len(res.Packets) - int(req.StartIndex); val > 0 { - exist := res.Packets[int(req.StartIndex):] - for i, s := range exist { - if len(req.Packets) == 0 { - break - } - if s != req.Packets[i] { - return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) - } - req.StartIndex++ - req.Packets = req.Packets[1:] - } - } - if len(req.Packets) == 0 && res.End == req.End { - return &msg.AppendStreamMsgResp{}, nil - } - deadlineTime := time.Now().Add(StreamDeadlineTime) - if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { - return nil, err - } - conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID) - if err != nil { - return nil, err - } - tips := &sdkws.StreamMsgTips{ - ConversationID: res.ConversationID, - ClientMsgID: res.ClientMsgID, - StartIndex: req.StartIndex, - Packets: req.Packets, - End: req.End, - } - var ( - recvID string - sessionType int32 - ) - if conversation.GroupID == "" { - sessionType = constant.SingleChatType - recvID = conversation.UserID - } else { - sessionType = constant.ReadGroupChatType - recvID = conversation.GroupID - } - m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) - return &msg.AppendStreamMsgResp{}, nil -} - -func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { - res, err := m.getStreamMsg(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - return &msg.GetStreamMsgResp{ - ClientMsgID: res.ClientMsgID, - ConversationID: res.ConversationID, - UserID: res.UserID, - Packets: res.Packets, - End: res.End, - CreateTime: res.CreateTime.UnixMilli(), - DeadlineTime: res.DeadlineTime.UnixMilli(), - }, nil -} diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 7578394b5..27442ca66 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -46,6 +46,9 @@ type ConversationDatabase interface { // SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is // transactional. SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error + // UpdateUserConversations updates all conversations related to a specified user. + // This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user). + UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error // CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error // GetConversationIDs retrieves conversation IDs for a given user. @@ -145,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, }) } +func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error { + conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + for _, conversation := range conversations { + cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID) + } + return cache.ChainExecDel(ctx) +} + func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) if err != nil { diff --git a/pkg/common/storage/controller/stream_msg.go b/pkg/common/storage/controller/stream_msg.go deleted file mode 100644 index 3409ccd93..000000000 --- a/pkg/common/storage/controller/stream_msg.go +++ /dev/null @@ -1,34 +0,0 @@ -package controller - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" -) - -type StreamMsgDatabase interface { - CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error - AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error - GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) -} - -func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase { - return &streamMsgDatabase{db: db} -} - -type streamMsgDatabase struct { - db database.StreamMsg -} - -func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error { - return m.db.CreateStreamMsg(ctx, model) -} - -func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { - return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime) -} - -func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - return m.db.GetStreamMsg(ctx, clientMsgID) -} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 1fb53cfed..d612dfc2d 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -24,6 +24,7 @@ import ( type Conversation interface { Create(ctx context.Context, conversations []*model.Conversation) (err error) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) + UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) Update(ctx context.Context, conversation *model.Conversation) (err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 536827450..89f13ea3d 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -21,23 +21,32 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" ) func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { coll := db.Collection(database.ConversationName) - _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ - Keys: bson.D{ - {Key: "owner_user_id", Value: 1}, - {Key: "conversation_id", Value: 1}, + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "owner_user_id", Value: 1}, + {Key: "conversation_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "user_id", Value: 1}, + }, + Options: options.Index(), }, - Options: options.Index().SetUnique(true), }) if err != nil { return nil, errs.Wrap(err) @@ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con return rows, nil } +func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) { + if len(args) == 0 { + return nil, nil + } + filter := bson.M{ + "user_id": userID, + } + + conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1})) + if err != nil { + return nil, err + } + err = mongoutil.IncrVersion(func() error { + _, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) + if err != nil { + return err + } + return nil + }, func() error { + for _, conversation := range conversations { + if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + return conversations, nil +} + func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) diff --git a/pkg/common/storage/database/mgo/stream_msg.go b/pkg/common/storage/database/mgo/stream_msg.go deleted file mode 100644 index c57798daa..000000000 --- a/pkg/common/storage/database/mgo/stream_msg.go +++ /dev/null @@ -1,60 +0,0 @@ -package mgo - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "time" -) - -func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) { - coll := db.Collection(database.StreamMsgName) - _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ - Keys: bson.D{ - {Key: "client_msg_id", Value: 1}, - }, - Options: options.Index().SetUnique(true), - }) - if err != nil { - return nil, errs.Wrap(err) - } - return &StreamMsgMongo{coll: coll}, nil -} - -type StreamMsgMongo struct { - coll *mongo.Collection -} - -func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error { - if val.Packets == nil { - val.Packets = []string{} - } - return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val}) -} - -func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { - update := bson.M{ - "$set": bson.M{ - "end": end, - "deadline_time": deadlineTime, - }, - } - if len(packets) > 0 { - update["$push"] = bson.M{ - "packets": bson.M{ - "$each": packets, - "$position": startIndex, - }, - } - } - return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true) -} - -func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID}) -} diff --git a/pkg/common/storage/database/stream_msg.go b/pkg/common/storage/database/stream_msg.go deleted file mode 100644 index e83fffbaa..000000000 --- a/pkg/common/storage/database/stream_msg.go +++ /dev/null @@ -1,13 +0,0 @@ -package database - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" -) - -type StreamMsg interface { - CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error - AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error - GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) -} diff --git a/pkg/common/storage/model/stream_msg.go b/pkg/common/storage/model/stream_msg.go deleted file mode 100644 index c040426a4..000000000 --- a/pkg/common/storage/model/stream_msg.go +++ /dev/null @@ -1,21 +0,0 @@ -package model - -import ( - "time" -) - -const ( - StreamMsgStatusWait = 0 - StreamMsgStatusDone = 1 - StreamMsgStatusFail = 2 -) - -type StreamMsg struct { - ClientMsgID string `bson:"client_msg_id"` - ConversationID string `bson:"conversation_id"` - UserID string `bson:"user_id"` - Packets []string `bson:"packets"` - End bool `bson:"end"` - CreateTime time.Time `bson:"create_time"` - DeadlineTime time.Time `bson:"deadline_time"` -} From 73934fd9553fd5848a0409766c3306b3e729a93b Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Fri, 21 Mar 2025 15:19:21 +0800 Subject: [PATCH 2/3] feat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. (#3239) --- internal/api/jssdk/jssdk.go | 80 +++++++++++++++++++++++++++---------- pkg/rpcli/msg.go | 4 +- 2 files changed, 62 insertions(+), 22 deletions(-) diff --git a/internal/api/jssdk/jssdk.go b/internal/api/jssdk/jssdk.go index 3c0911207..0d30b1ea0 100644 --- a/internal/api/jssdk/jssdk.go +++ b/internal/api/jssdk/jssdk.go @@ -2,10 +2,14 @@ package jssdk import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sort" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/log" + "github.com/gin-gonic/gin" + "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/jssdk" "github.com/openimsdk/protocol/msg" @@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if len(conversationIDs) == 0 { return &jssdk.GetActiveConversationsResp{}, nil } - readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) - if err != nil { - return nil, err - } + activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) if err != nil { return nil, err @@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if len(activeConversation) == 0 { return &jssdk.GetActiveConversationsResp{}, nil } + readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) + if err != nil { + return nil, err + } sortConversations := sortActiveConversations{ Conversation: activeConversation, } @@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if err != nil { return nil, err } + x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { return c.ConversationID }) @@ -156,16 +162,15 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if !ok { continue } - var lastMsg *sdkws.MsgData if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { - lastMsg = msgList.Msgs[0] + resp = append(resp, &jssdk.ConversationMsg{ + Conversation: conv, + LastMsg: msgList.Msgs[0], + MaxSeq: c.MaxSeq, + ReadSeq: readSeq[c.ConversationID], + }) } - resp = append(resp, &jssdk.ConversationMsg{ - Conversation: conv, - LastMsg: lastMsg, - MaxSeq: c.MaxSeq, - ReadSeq: readSeq[c.ConversationID], - }) + } if err := x.fillConversations(ctx, resp); err != nil { return nil, err @@ -219,18 +224,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation return nil, err } } + x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) for _, c := range conversations { - var lastMsg *sdkws.MsgData if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { - lastMsg = msgList.Msgs[0] + resp = append(resp, &jssdk.ConversationMsg{ + Conversation: c, + LastMsg: msgList.Msgs[0], + MaxSeq: maxSeqs[c.ConversationID], + ReadSeq: readSeqs[c.ConversationID], + }) } - resp = append(resp, &jssdk.ConversationMsg{ - Conversation: c, - LastMsg: lastMsg, - MaxSeq: maxSeqs[c.ConversationID], - ReadSeq: readSeqs[c.ConversationID], - }) + } if err := x.fillConversations(ctx, resp); err != nil { return nil, err @@ -247,3 +252,36 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation UnreadCount: unreadCount, }, nil } + +// This function checks whether the latest MaxSeq message is valid. +// If not, it needs to fetch a valid message again. +func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) { + var conversationIDs []string + + for conversationID, message := range messages { + allInValid := true + for _, data := range message.Msgs { + if data.Status < constant.MsgStatusHasDeleted { + allInValid = false + break + } + } + if allInValid { + conversationIDs = append(conversationIDs, conversationID) + } + } + if len(conversationIDs) > 0 { + resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: userID, + ConversationIDs: conversationIDs, + }) + if err != nil { + log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs) + return + } + for conversationID, message := range resp.Msgs { + messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}} + } + } + +} diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index 0c44b7c8b..e4d1ece6e 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -2,9 +2,11 @@ package rpcli import ( "context" + + "google.golang.org/grpc" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" - "google.golang.org/grpc" ) func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { From 4dc9b4586188302d0a93edf3d1f9f8f4f8380db4 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 28 Mar 2025 15:46:42 +0800 Subject: [PATCH 3/3] feat: implement stress-test tools. (#3261) * feat: implement stress-test tools. * revert config file. --- config/share.yml | 4 +- tools/stress-test/README.md | 25 ++ tools/stress-test/main.go | 452 ++++++++++++++++++++++++++++++++++++ 3 files changed, 479 insertions(+), 2 deletions(-) create mode 100644 tools/stress-test/README.md create mode 100755 tools/stress-test/main.go diff --git a/config/share.yml b/config/share.yml index a5fbeac75..0913c1e88 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,9 +1,9 @@ secret: openIM123 -imAdminUserID: [ imAdmin ] +imAdminUserID: [imAdmin] # 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time multiLogin: policy: 1 # max num of tokens in one end - maxNumOneEnd: 30 \ No newline at end of file + maxNumOneEnd: 30 diff --git a/tools/stress-test/README.md b/tools/stress-test/README.md new file mode 100644 index 000000000..531233a20 --- /dev/null +++ b/tools/stress-test/README.md @@ -0,0 +1,25 @@ +# Stress Test + +## Usage + +You need set `TestTargetUserList` and `DefaultGroupID` variables. + +### Build + +```bash +go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go + +# or + +go build -o tools/stress-test/stress-test tools/stress-test/main.go +``` + +### Excute + +```bash +_output/bin/tools/linux/amd64/stress-test -c config/ + +#or + +tools/stress-test/stress-test -c config/ +``` diff --git a/tools/stress-test/main.go b/tools/stress-test/main.go new file mode 100755 index 000000000..ee58c9749 --- /dev/null +++ b/tools/stress-test/main.go @@ -0,0 +1,452 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/relation" + "github.com/openimsdk/protocol/sdkws" + pbuser "github.com/openimsdk/protocol/user" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/system/program" +) + +/* + 1. Create one user every minute + 2. Import target users as friends + 3. Add users to the default group + 4. Send a message to the default group every second, containing index and current timestamp + 5. Create a new group every minute and invite target users to join +*/ + +// !!! ATTENTION: This variable is must be added! +var ( + // Use default userIDs List for testing, need to be created. + TestTargetUserList = []string{ + "", + } + DefaultGroupID = "" // Use default group ID for testing, need to be created. +) + +var ( + ApiAddress string + + // API method + GetAdminToken = "/auth/get_admin_token" + CreateUser = "/user/user_register" + ImportFriend = "/friend/import_friend" + InviteToGroup = "/group/invite_user_to_group" + SendMsg = "/msg/send_msg" + CreateGroup = "/group/create_group" + GetUserToken = "/auth/user_token" +) + +const ( + MaxUser = 10000 + MaxGroup = 1000 + + CreateUserTicker = 1 * time.Minute // Ticker is 1min in create user + SendMessageTicker = 1 * time.Second // Ticker is 1s in send message + CreateGroupTicker = 1 * time.Minute +) + +type BaseResp struct { + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + Data json.RawMessage `json:"data"` +} + +type StressTest struct { + Conf *conf + AdminUserID string + AdminToken string + DefaultGroupID string + DefaultSendUserID string + UserCounter int + GroupCounter int + MsgCounter int + CreatedUsers []string + CreatedGroups []string + Mutex sync.Mutex + Ctx context.Context + Cancel context.CancelFunc + HttpClient *http.Client + Wg sync.WaitGroup + Once sync.Once +} + +type conf struct { + Share config.Share + Api config.API +} + +func initConfig(configDir string) (*config.Share, *config.API, error) { + var ( + share = &config.Share{} + apiConfig = &config.API{} + ) + + err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) + if err != nil { + return nil, nil, err + } + + err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) + if err != nil { + return nil, nil, err + } + + return share, apiConfig, nil +} + +// Post Request +func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { + // Marshal body + jsonBody, err := json.Marshal(reqbody) + if err != nil { + log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("operationID", st.AdminUserID) + if st.AdminToken != "" { + req.Header.Set("token", st.AdminToken) + } + + // log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) + + resp, err := st.HttpClient.Do(req) + if err != nil { + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) + return nil, err + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + log.ZError(ctx, "Failed to read response body", err, "url", url) + return nil, err + } + + var baseResp BaseResp + if err := json.Unmarshal(respBody, &baseResp); err != nil { + log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) + return nil, err + } + + if baseResp.ErrCode != 0 { + err = fmt.Errorf(baseResp.ErrMsg) + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) + return nil, err + } + + return baseResp.Data, nil +} + +func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { + req := auth.GetAdminTokenReq{ + Secret: st.Conf.Share.Secret, + UserID: st.AdminUserID, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) + if err != nil { + return "", err + } + + data := &auth.GetAdminTokenResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return "", err + } + + return data.Token, nil +} + +func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { + user := &sdkws.UserInfo{ + UserID: userID, + Nickname: userID, + } + + req := pbuser.UserRegisterReq{ + Users: []*sdkws.UserInfo{user}, + } + + _, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) + if err != nil { + return "", err + } + + st.UserCounter++ + return userID, nil +} + +func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { + req := relation.ImportFriendReq{ + OwnerUserID: userID, + FriendUserIDs: TestTargetUserList, + } + + _, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) + if err != nil { + return err + } + + return nil +} + +func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { + req := group.InviteUserToGroupReq{ + GroupID: st.DefaultGroupID, + InvitedUserIDs: []string{userID}, + } + _, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) + if err != nil { + return err + } + + return nil +} + +func (st *StressTest) SendMsg(ctx context.Context, userID string) error { + contentObj := map[string]any{ + "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), + } + + req := map[string]any{ + "sendID": userID, + "groupID": st.DefaultGroupID, + "contentType": constant.Text, + "sessionType": constant.ReadGroupChatType, + "content": contentObj, + } + + _, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) + if err != nil { + log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) + return err + } + + st.MsgCounter++ + + return nil +} + +func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { + groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) + + req := map[string]any{ + "memberUserIDs": TestTargetUserList, + "ownerUserID": userID, + "groupInfo": map[string]any{ + "groupID": groupID, + "groupName": groupID, + "groupType": constant.WorkingGroup, + }, + } + resp := group.CreateGroupResp{} + + response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) + if err != nil { + return "", err + } + + if err := json.Unmarshal(response, &resp); err != nil { + return "", err + } + + st.GroupCounter++ + + return resp.GroupInfo.GroupID, nil +} + +func main() { + var configPath string + // defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") + // flag.StringVar(&configPath, "c", defaultConfigDir, "config path") + flag.StringVar(&configPath, "c", "", "config path") + flag.Parse() + + if configPath == "" { + _, _ = fmt.Fprintln(os.Stderr, "config path is empty") + os.Exit(1) + return + } + + fmt.Printf(" Config Path: %s\n", configPath) + + share, apiConfig, err := initConfig(configPath) + if err != nil { + program.ExitWithError(err) + return + } + + ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + + defer cancel() + + st := &StressTest{ + Conf: &conf{ + Share: *share, + Api: *apiConfig, + }, + AdminUserID: share.IMAdminUserID[0], + Ctx: ctx, + Cancel: cancel, + HttpClient: &http.Client{ + Timeout: 50 * time.Second, + }, + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nReceived stop signal, stopping...") + + select { + case <-ch: + default: + close(ch) + } + + st.Cancel() + }() + + token, err := st.GetAdminToken(st.Ctx) + if err != nil { + log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) + } + + st.AdminToken = token + fmt.Println("Admin Token:", st.AdminToken) + fmt.Println("ApiAddress:", ApiAddress) + + st.DefaultGroupID = DefaultGroupID + + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + ticker := time.NewTicker(CreateUserTicker) + defer ticker.Stop() + + for st.UserCounter < MaxUser { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") + return + + case <-ticker.C: + // Create User + userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) + + userCreatedID, err := st.CreateUser(st.Ctx, userID) + if err != nil { + log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) + os.Exit(1) + return + } + // fmt.Println("User Created ID:", userCreatedID) + + // Import Friend + if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { + log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) + os.Exit(1) + return + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) + os.Exit(1) + return + } + + st.Once.Do(func() { + st.DefaultSendUserID = userCreatedID + fmt.Println("Default Send User Created ID:", userCreatedID) + close(ch) + }) + } + } + }() + + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + ticker := time.NewTicker(SendMessageTicker) + defer ticker.Stop() + <-ch + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") + return + + case <-ticker.C: + // Send Message + if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { + log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) + continue + } + } + } + }() + + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + ticker := time.NewTicker(CreateGroupTicker) + defer ticker.Stop() + <-ch + + for st.GroupCounter < MaxGroup { + + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") + return + + case <-ticker.C: + + // Create Group + _, err := st.CreateGroup(st.Ctx, st.DefaultSendUserID) + if err != nil { + log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultSendUserID) + os.Exit(1) + return + } + + // fmt.Println("Group Created ID:", groupID) + } + } + }() + + st.Wg.Wait() +}