From 25e79e4b77d02d91673d2f583fb63970b819e7a0 Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Tue, 12 May 2026 17:24:53 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=A4=E9=9D=99=E9=9F=B3=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/api/group.go | 8 ++ internal/api/router.go | 2 + internal/push/push.go | 15 ++- internal/push/push_handler.go | 26 ++++- internal/rpc/group/group.go | 6 + internal/rpc/group/group_mute.go | 65 +++++++++++ internal/rpc/relation/friend.go | 19 +-- pkg/common/cmd/push.go | 1 + pkg/common/storage/controller/group_mute.go | 40 +++++++ pkg/common/storage/database/group_mute.go | 17 +++ pkg/common/storage/database/mgo/group_mute.go | 109 ++++++++++++++++++ pkg/common/storage/database/mgo/user_mute.go | 3 +- pkg/common/storage/database/name.go | 1 + pkg/common/storage/model/group_mute.go | 14 +++ pkg/common/storage/model/user_mute.go | 10 +- 15 files changed, 321 insertions(+), 15 deletions(-) create mode 100644 internal/rpc/group/group_mute.go create mode 100644 pkg/common/storage/controller/group_mute.go create mode 100644 pkg/common/storage/database/group_mute.go create mode 100644 pkg/common/storage/database/mgo/group_mute.go create mode 100644 pkg/common/storage/model/group_mute.go diff --git a/internal/api/group.go b/internal/api/group.go index eec11353a..0e4bc4248 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -185,3 +185,11 @@ func (o *GroupApi) UnpinGroupMessage(c *gin.Context) { func (o *GroupApi) GetGroupPinnedMessages(c *gin.Context) { a2r.Call(c, group.GroupClient.GetGroupPinnedMessages, o.Client) } + +func (o *GroupApi) SetGroupMute(c *gin.Context) { + a2r.Call(c, group.GroupClient.SetGroupMute, o.Client) +} + +func (o *GroupApi) GetGroupMute(c *gin.Context) { + a2r.Call(c, group.GroupClient.GetGroupMute, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index 7eb7f1419..93366f88b 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -264,6 +264,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co groupRouterGroup.POST("/pin_group_message", g.PinGroupMessage) groupRouterGroup.POST("/unpin_group_message", g.UnpinGroupMessage) groupRouterGroup.POST("/get_group_pinned_messages", g.GetGroupPinnedMessages) + groupRouterGroup.POST("/set_mute", g.SetGroupMute) + groupRouterGroup.POST("/get_mute", g.GetGroupMute) } // certificate { diff --git a/internal/push/push.go b/internal/push/push.go index 6f8067fc8..4a75d64f8 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -7,7 +7,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" pbpush "github.com/openimsdk/protocol/push" + "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" "google.golang.org/grpc" @@ -32,6 +34,7 @@ type Config struct { LocalCacheConfig config.LocalCache Discovery config.Discovery FcmConfigPath string + MongodbConfig config.Mongo `mapstructure:"mongodb"` runTimeEnv string } @@ -57,7 +60,17 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig) - consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client) + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + if err != nil { + return err + } + groupMuteMongo, err := mgo.NewGroupMuteMongo(mgocli.GetDB()) + if err != nil { + return err + } + groupMuteDB := controller.NewGroupMuteDatabase(groupMuteMongo) + + consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client, groupMuteDB) if err != nil { return err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index a42d4f38b..5cfdb8ccb 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -38,6 +38,7 @@ type ConsumerHandler struct { offlinePusher offlinepush.OfflinePusher onlinePusher OnlinePusher pushDatabase controller.PushDatabase + groupMuteDB controller.GroupMuteDatabase onlineCache *rpccache.OnlineCache groupLocalCache *rpccache.GroupLocalCache conversationLocalCache *rpccache.ConversationLocalCache @@ -50,7 +51,7 @@ type ConsumerHandler struct { } func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, - client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) { + client discovery.SvcDiscoveryRegistry, groupMuteDB controller.GroupMuteDatabase) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, @@ -78,6 +79,7 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller consumerHandler.groupClient = rpcli.NewGroupClient(groupConn) consumerHandler.msgClient = rpcli.NewMsgClient(msgConn) consumerHandler.conversationClient = rpcli.NewConversationClient(conversationConn) + consumerHandler.groupMuteDB = groupMuteDB consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) @@ -377,7 +379,27 @@ func (c *ConsumerHandler) filterGroupMessageOfflinePush(ctx context.Context, gro if err != nil { return nil, err } - return needOfflinePushUserIDs, nil + if c.groupMuteDB == nil || len(needOfflinePushUserIDs) == 0 { + return needOfflinePushUserIDs, nil + } + muted, err := c.groupMuteDB.ListActiveMutedUserIDs(ctx, groupID, needOfflinePushUserIDs) + if err != nil { + return nil, err + } + if len(muted) == 0 { + return needOfflinePushUserIDs, nil + } + mutedSet := make(map[string]struct{}, len(muted)) + for _, u := range muted { + mutedSet[u] = struct{}{} + } + out := make([]string, 0, len(needOfflinePushUserIDs)) + for _, u := range needOfflinePushUserIDs { + if _, ok := mutedSet[u]; !ok { + out = append(out, u) + } + } + return out, nil } func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index fbe918c5e..6bb386977 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -60,6 +60,7 @@ import ( type groupServer struct { pbgroup.UnimplementedGroupServer db controller.GroupDatabase + groupMuteDB controller.GroupMuteDatabase notification *NotificationSender config *Config webhookClient *webhook.Client @@ -106,6 +107,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + groupMuteMongo, err := mgo.NewGroupMuteMongo(mgocli.GetDB()) + if err != nil { + return err + } //userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) //msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) @@ -141,6 +146,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg //cryptoClient: rpcli.NewCryptoClient(cryptoConn), } gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, groupPinnedMsgDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) + gs.groupMuteDB = controller.NewGroupMuteDatabase(groupMuteMongo) gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient) localcache.InitLocalCache(&config.LocalCacheConfig) pbgroup.RegisterGroupServer(server, &gs) diff --git a/internal/rpc/group/group_mute.go b/internal/rpc/group/group_mute.go new file mode 100644 index 000000000..01e81fedf --- /dev/null +++ b/internal/rpc/group/group_mute.go @@ -0,0 +1,65 @@ +package group + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + pbgroup "github.com/openimsdk/protocol/group" + "github.com/openimsdk/tools/mcontext" +) + +func (s *groupServer) SetGroupMute(ctx context.Context, req *pbgroup.SetGroupMuteReq) (*pbgroup.SetGroupMuteResp, error) { + opUserID := mcontext.GetOpUserID(ctx) + if opUserID == "" { + return nil, servererrs.ErrNoPermission.WrapMsg("op user id is empty") + } + if _, err := s.db.TakeGroupMember(ctx, req.GroupID, opUserID); err != nil { + return nil, err + } + if req.Duration == 0 { + return &pbgroup.SetGroupMuteResp{}, s.groupMuteDB.Delete(ctx, opUserID, req.GroupID) + } + var muteEnd int64 + if req.Duration != -1 { + muteEnd = time.Now().Unix() + req.Duration + } + return &pbgroup.SetGroupMuteResp{}, s.groupMuteDB.Upsert(ctx, &model.GroupMute{ + OwnerUserID: opUserID, + GroupID: req.GroupID, + MuteEndTime: muteEnd, + MuteDuration: req.Duration, + CreateTime: time.Now(), + }) +} + +func (s *groupServer) GetGroupMute(ctx context.Context, req *pbgroup.GetGroupMuteReq) (*pbgroup.GetGroupMuteResp, error) { + opUserID := mcontext.GetOpUserID(ctx) + if opUserID == "" { + return nil, servererrs.ErrNoPermission.WrapMsg("op user id is empty") + } + if _, err := s.db.TakeGroupMember(ctx, req.GroupID, opUserID); err != nil { + return nil, err + } + rec, err := s.groupMuteDB.Get(ctx, opUserID, req.GroupID) + if err != nil { + return nil, err + } + if rec == nil { + return &pbgroup.GetGroupMuteResp{}, nil + } + now := time.Now().Unix() + if rec.MuteEndTime != 0 && rec.MuteEndTime <= now { + return &pbgroup.GetGroupMuteResp{}, nil + } + duration := rec.MuteDuration + if duration == 0 && rec.MuteEndTime == 0 { + duration = -1 + } + return &pbgroup.GetGroupMuteResp{ + Muted: true, + MuteEndTime: rec.MuteEndTime, + Duration: duration, + }, nil +} diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 71bdfd6e7..da6d360ea 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -738,10 +738,11 @@ func (s *friendServer) SetMute(ctx context.Context, req *relation.SetMuteReq) (* muteEndTime = time.Now().Unix() + req.Duration } return &relation.SetMuteResp{}, s.userMuteDB.Upsert(ctx, &model.UserMute{ - OwnerUserID: req.OwnerUserID, - MutedUserID: req.TargetUserID, - MuteEndTime: muteEndTime, - CreateTime: time.Now(), + OwnerUserID: req.OwnerUserID, + MutedUserID: req.TargetUserID, + MuteEndTime: muteEndTime, + MuteDuration: req.Duration, + CreateTime: time.Now(), }) } @@ -754,13 +755,17 @@ func (s *friendServer) GetMute(ctx context.Context, req *relation.GetMuteReq) (* return nil, err } if rec == nil { - return &relation.GetMuteResp{Muted: false, MuteEndTime: 0}, nil + return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: 0}, nil } now := time.Now().Unix() if rec.MuteEndTime != 0 && rec.MuteEndTime <= now { - return &relation.GetMuteResp{Muted: false, MuteEndTime: 0}, nil + return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: 0}, nil } - return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime}, nil + duration := rec.MuteDuration + if duration == 0 && rec.MuteEndTime == 0 { + duration = -1 + } + return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime, Duration: duration}, nil } func (s *friendServer) getCommonUserMap(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error) { diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 62bdfceaf..0ae395bec 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -43,6 +43,7 @@ func NewPushRpcCmd() *PushRpcCmd { WebhooksConfigFileName: &pushConfig.WebhooksConfig, LocalCacheConfigFileName: &pushConfig.LocalCacheConfig, DiscoveryConfigFilename: &pushConfig.Discovery, + MongodbConfigFileName: &pushConfig.MongodbConfig, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) diff --git a/pkg/common/storage/controller/group_mute.go b/pkg/common/storage/controller/group_mute.go new file mode 100644 index 000000000..befd928fe --- /dev/null +++ b/pkg/common/storage/controller/group_mute.go @@ -0,0 +1,40 @@ +package controller + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +// GroupMuteDatabase per-user group notification mute. +type GroupMuteDatabase interface { + Upsert(ctx context.Context, mute *model.GroupMute) error + Delete(ctx context.Context, ownerUserID, groupID string) error + ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error) + Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error) +} + +type groupMuteDatabase struct { + db database.GroupMute +} + +func NewGroupMuteDatabase(db database.GroupMute) GroupMuteDatabase { + return &groupMuteDatabase{db: db} +} + +func (g *groupMuteDatabase) Upsert(ctx context.Context, mute *model.GroupMute) error { + return g.db.Upsert(ctx, mute) +} + +func (g *groupMuteDatabase) Delete(ctx context.Context, ownerUserID, groupID string) error { + return g.db.Delete(ctx, ownerUserID, groupID) +} + +func (g *groupMuteDatabase) ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error) { + return g.db.ListActiveMutedUserIDs(ctx, groupID, candidateUserIDs) +} + +func (g *groupMuteDatabase) Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error) { + return g.db.Get(ctx, ownerUserID, groupID) +} diff --git a/pkg/common/storage/database/group_mute.go b/pkg/common/storage/database/group_mute.go new file mode 100644 index 000000000..7af696cbf --- /dev/null +++ b/pkg/common/storage/database/group_mute.go @@ -0,0 +1,17 @@ +package database + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +// GroupMute persists per-user group notification mute settings. +type GroupMute interface { + Upsert(ctx context.Context, mute *model.GroupMute) error + Delete(ctx context.Context, ownerUserID, groupID string) error + // ListActiveMutedUserIDs returns which of candidateUserIDs currently have an active mute on this group. + ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error) + // Get returns one document by owner + group; nil if not found. + Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error) +} diff --git a/pkg/common/storage/database/mgo/group_mute.go b/pkg/common/storage/database/mgo/group_mute.go new file mode 100644 index 000000000..9e95d150b --- /dev/null +++ b/pkg/common/storage/database/mgo/group_mute.go @@ -0,0 +1,109 @@ +package mgo + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/errs" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewGroupMuteMongo(db *mongo.Database) (database.GroupMute, error) { + coll := db.Collection(database.GroupMuteName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "owner_user_id", Value: 1}, + {Key: "group_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &GroupMuteMgo{coll: coll}, nil +} + +type GroupMuteMgo struct { + coll *mongo.Collection +} + +func (g *GroupMuteMgo) Upsert(ctx context.Context, mute *model.GroupMute) error { + if mute.CreateTime.IsZero() { + mute.CreateTime = time.Now() + } + filter := bson.M{ + "owner_user_id": mute.OwnerUserID, + "group_id": mute.GroupID, + } + update := bson.M{ + "$set": bson.M{ + "mute_end_time": mute.MuteEndTime, + "mute_duration": mute.MuteDuration, + }, + "$setOnInsert": bson.M{ + "owner_user_id": mute.OwnerUserID, + "group_id": mute.GroupID, + "create_time": mute.CreateTime, + }, + } + _, err := g.coll.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) + return errs.Wrap(err) +} + +func (g *GroupMuteMgo) Delete(ctx context.Context, ownerUserID, groupID string) error { + _, err := g.coll.DeleteOne(ctx, bson.M{ + "owner_user_id": ownerUserID, + "group_id": groupID, + }) + return errs.Wrap(err) +} + +func (g *GroupMuteMgo) ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error) { + if len(candidateUserIDs) == 0 { + return nil, nil + } + now := time.Now().Unix() + filter := bson.M{ + "group_id": groupID, + "owner_user_id": bson.M{"$in": candidateUserIDs}, + "$or": bson.A{ + bson.M{"mute_end_time": 0}, + bson.M{"mute_end_time": bson.M{"$gt": now}}, + }, + } + cur, err := g.coll.Find(ctx, filter, options.Find().SetProjection(bson.M{"owner_user_id": 1, "_id": 0})) + if err != nil { + return nil, errs.Wrap(err) + } + defer cur.Close(ctx) + var out []string + for cur.Next(ctx) { + var doc struct { + OwnerUserID string `bson:"owner_user_id"` + } + if err := cur.Decode(&doc); err != nil { + return nil, errs.Wrap(err) + } + out = append(out, doc.OwnerUserID) + } + return out, cur.Err() +} + +func (g *GroupMuteMgo) Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error) { + var out model.GroupMute + err := g.coll.FindOne(ctx, bson.M{ + "owner_user_id": ownerUserID, + "group_id": groupID, + }).Decode(&out) + if err != nil { + if err == mongo.ErrNoDocuments { + return nil, nil + } + return nil, errs.Wrap(err) + } + return &out, nil +} diff --git a/pkg/common/storage/database/mgo/user_mute.go b/pkg/common/storage/database/mgo/user_mute.go index 5c22c43b4..3fc2d0272 100644 --- a/pkg/common/storage/database/mgo/user_mute.go +++ b/pkg/common/storage/database/mgo/user_mute.go @@ -41,7 +41,8 @@ func (u *UserMuteMgo) Upsert(ctx context.Context, mute *model.UserMute) error { } update := bson.M{ "$set": bson.M{ - "mute_end_time": mute.MuteEndTime, + "mute_end_time": mute.MuteEndTime, + "mute_duration": mute.MuteDuration, }, "$setOnInsert": bson.M{ "owner_user_id": mute.OwnerUserID, diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index a906d3f72..8df4f6a74 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -13,6 +13,7 @@ const ( ConversationVersionName = "conversation_version" GroupRequestName = "group_request" GroupPinnedMsgName = "group_pinned_msg" + GroupMuteName = "group_mute" LogName = "log" ObjectName = "s3" UserName = "user" diff --git a/pkg/common/storage/model/group_mute.go b/pkg/common/storage/model/group_mute.go new file mode 100644 index 000000000..b8ff4f887 --- /dev/null +++ b/pkg/common/storage/model/group_mute.go @@ -0,0 +1,14 @@ +package model + +import "time" + +// GroupMute is per-user mute of group message notifications (e.g. offline push). +// OwnerUserID must be a group member; MuteEndTime 0 means permanent. +// MuteDuration is the configured interval at set time: -1 permanent, >0 seconds. +type GroupMute struct { + OwnerUserID string `bson:"owner_user_id"` + GroupID string `bson:"group_id"` + MuteEndTime int64 `bson:"mute_end_time"` // Unix seconds; 0 = permanent + MuteDuration int64 `bson:"mute_duration"` // -1 permanent, >0 seconds + CreateTime time.Time `bson:"create_time"` +} diff --git a/pkg/common/storage/model/user_mute.go b/pkg/common/storage/model/user_mute.go index b780edd69..98edff713 100644 --- a/pkg/common/storage/model/user_mute.go +++ b/pkg/common/storage/model/user_mute.go @@ -4,9 +4,11 @@ import "time" // UserMute records a mute relationship: OwnerUserID has muted MutedUserID. // Works for both friends and strangers. MuteEndTime == 0 means permanent mute. +// MuteDuration is the configured interval at set time: -1 permanent, >0 seconds (0 if unset / legacy). type UserMute struct { - OwnerUserID string `bson:"owner_user_id"` // who set the mute - MutedUserID string `bson:"muted_user_id"` // who is muted - MuteEndTime int64 `bson:"mute_end_time"` // Unix seconds; 0 = permanent - CreateTime time.Time `bson:"create_time"` + OwnerUserID string `bson:"owner_user_id"` // who set the mute + MutedUserID string `bson:"muted_user_id"` // who is muted + MuteEndTime int64 `bson:"mute_end_time"` // Unix seconds; 0 = permanent + MuteDuration int64 `bson:"mute_duration"` // configured interval: -1 permanent, >0 seconds + CreateTime time.Time `bson:"create_time"` }