群静音设置

This commit is contained in:
hawklin2017 2026-05-12 17:24:53 +08:00
parent 542d479829
commit 25e79e4b77
15 changed files with 321 additions and 15 deletions

View File

@ -185,3 +185,11 @@ func (o *GroupApi) UnpinGroupMessage(c *gin.Context) {
func (o *GroupApi) GetGroupPinnedMessages(c *gin.Context) {
a2r.Call(c, group.GroupClient.GetGroupPinnedMessages, o.Client)
}
func (o *GroupApi) SetGroupMute(c *gin.Context) {
a2r.Call(c, group.GroupClient.SetGroupMute, o.Client)
}
func (o *GroupApi) GetGroupMute(c *gin.Context) {
a2r.Call(c, group.GroupClient.GetGroupMute, o.Client)
}

View File

@ -264,6 +264,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
groupRouterGroup.POST("/pin_group_message", g.PinGroupMessage)
groupRouterGroup.POST("/unpin_group_message", g.UnpinGroupMessage)
groupRouterGroup.POST("/get_group_pinned_messages", g.GetGroupPinnedMessages)
groupRouterGroup.POST("/set_mute", g.SetGroupMute)
groupRouterGroup.POST("/get_mute", g.GetGroupMute)
}
// certificate
{

View File

@ -7,7 +7,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
@ -32,6 +34,7 @@ type Config struct {
LocalCacheConfig config.LocalCache
Discovery config.Discovery
FcmConfigPath string
MongodbConfig config.Mongo `mapstructure:"mongodb"`
runTimeEnv string
}
@ -57,7 +60,17 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig)
consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client)
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
}
groupMuteMongo, err := mgo.NewGroupMuteMongo(mgocli.GetDB())
if err != nil {
return err
}
groupMuteDB := controller.NewGroupMuteDatabase(groupMuteMongo)
consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client, groupMuteDB)
if err != nil {
return err
}

View File

@ -38,6 +38,7 @@ type ConsumerHandler struct {
offlinePusher offlinepush.OfflinePusher
onlinePusher OnlinePusher
pushDatabase controller.PushDatabase
groupMuteDB controller.GroupMuteDatabase
onlineCache *rpccache.OnlineCache
groupLocalCache *rpccache.GroupLocalCache
conversationLocalCache *rpccache.ConversationLocalCache
@ -50,7 +51,7 @@ type ConsumerHandler struct {
}
func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) {
client discovery.SvcDiscoveryRegistry, groupMuteDB controller.GroupMuteDatabase) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
var err error
consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID,
@ -78,6 +79,7 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller
consumerHandler.groupClient = rpcli.NewGroupClient(groupConn)
consumerHandler.msgClient = rpcli.NewMsgClient(msgConn)
consumerHandler.conversationClient = rpcli.NewConversationClient(conversationConn)
consumerHandler.groupMuteDB = groupMuteDB
consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config)
@ -377,7 +379,27 @@ func (c *ConsumerHandler) filterGroupMessageOfflinePush(ctx context.Context, gro
if err != nil {
return nil, err
}
return needOfflinePushUserIDs, nil
if c.groupMuteDB == nil || len(needOfflinePushUserIDs) == 0 {
return needOfflinePushUserIDs, nil
}
muted, err := c.groupMuteDB.ListActiveMutedUserIDs(ctx, groupID, needOfflinePushUserIDs)
if err != nil {
return nil, err
}
if len(muted) == 0 {
return needOfflinePushUserIDs, nil
}
mutedSet := make(map[string]struct{}, len(muted))
for _, u := range muted {
mutedSet[u] = struct{}{}
}
out := make([]string, 0, len(needOfflinePushUserIDs))
for _, u := range needOfflinePushUserIDs {
if _, ok := mutedSet[u]; !ok {
out = append(out, u)
}
}
return out, nil
}
func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) {

View File

@ -60,6 +60,7 @@ import (
type groupServer struct {
pbgroup.UnimplementedGroupServer
db controller.GroupDatabase
groupMuteDB controller.GroupMuteDatabase
notification *NotificationSender
config *Config
webhookClient *webhook.Client
@ -106,6 +107,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
groupMuteMongo, err := mgo.NewGroupMuteMongo(mgocli.GetDB())
if err != nil {
return err
}
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
//msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
@ -141,6 +146,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
//cryptoClient: rpcli.NewCryptoClient(cryptoConn),
}
gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, groupPinnedMsgDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.groupMuteDB = controller.NewGroupMuteDatabase(groupMuteMongo)
gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient)
localcache.InitLocalCache(&config.LocalCacheConfig)
pbgroup.RegisterGroupServer(server, &gs)

View File

@ -0,0 +1,65 @@
package group
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/tools/mcontext"
)
func (s *groupServer) SetGroupMute(ctx context.Context, req *pbgroup.SetGroupMuteReq) (*pbgroup.SetGroupMuteResp, error) {
opUserID := mcontext.GetOpUserID(ctx)
if opUserID == "" {
return nil, servererrs.ErrNoPermission.WrapMsg("op user id is empty")
}
if _, err := s.db.TakeGroupMember(ctx, req.GroupID, opUserID); err != nil {
return nil, err
}
if req.Duration == 0 {
return &pbgroup.SetGroupMuteResp{}, s.groupMuteDB.Delete(ctx, opUserID, req.GroupID)
}
var muteEnd int64
if req.Duration != -1 {
muteEnd = time.Now().Unix() + req.Duration
}
return &pbgroup.SetGroupMuteResp{}, s.groupMuteDB.Upsert(ctx, &model.GroupMute{
OwnerUserID: opUserID,
GroupID: req.GroupID,
MuteEndTime: muteEnd,
MuteDuration: req.Duration,
CreateTime: time.Now(),
})
}
func (s *groupServer) GetGroupMute(ctx context.Context, req *pbgroup.GetGroupMuteReq) (*pbgroup.GetGroupMuteResp, error) {
opUserID := mcontext.GetOpUserID(ctx)
if opUserID == "" {
return nil, servererrs.ErrNoPermission.WrapMsg("op user id is empty")
}
if _, err := s.db.TakeGroupMember(ctx, req.GroupID, opUserID); err != nil {
return nil, err
}
rec, err := s.groupMuteDB.Get(ctx, opUserID, req.GroupID)
if err != nil {
return nil, err
}
if rec == nil {
return &pbgroup.GetGroupMuteResp{}, nil
}
now := time.Now().Unix()
if rec.MuteEndTime != 0 && rec.MuteEndTime <= now {
return &pbgroup.GetGroupMuteResp{}, nil
}
duration := rec.MuteDuration
if duration == 0 && rec.MuteEndTime == 0 {
duration = -1
}
return &pbgroup.GetGroupMuteResp{
Muted: true,
MuteEndTime: rec.MuteEndTime,
Duration: duration,
}, nil
}

View File

@ -738,10 +738,11 @@ func (s *friendServer) SetMute(ctx context.Context, req *relation.SetMuteReq) (*
muteEndTime = time.Now().Unix() + req.Duration
}
return &relation.SetMuteResp{}, s.userMuteDB.Upsert(ctx, &model.UserMute{
OwnerUserID: req.OwnerUserID,
MutedUserID: req.TargetUserID,
MuteEndTime: muteEndTime,
CreateTime: time.Now(),
OwnerUserID: req.OwnerUserID,
MutedUserID: req.TargetUserID,
MuteEndTime: muteEndTime,
MuteDuration: req.Duration,
CreateTime: time.Now(),
})
}
@ -754,13 +755,17 @@ func (s *friendServer) GetMute(ctx context.Context, req *relation.GetMuteReq) (*
return nil, err
}
if rec == nil {
return &relation.GetMuteResp{Muted: false, MuteEndTime: 0}, nil
return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: 0}, nil
}
now := time.Now().Unix()
if rec.MuteEndTime != 0 && rec.MuteEndTime <= now {
return &relation.GetMuteResp{Muted: false, MuteEndTime: 0}, nil
return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: 0}, nil
}
return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime}, nil
duration := rec.MuteDuration
if duration == 0 && rec.MuteEndTime == 0 {
duration = -1
}
return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime, Duration: duration}, nil
}
func (s *friendServer) getCommonUserMap(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error) {

View File

@ -43,6 +43,7 @@ func NewPushRpcCmd() *PushRpcCmd {
WebhooksConfigFileName: &pushConfig.WebhooksConfig,
LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
DiscoveryConfigFilename: &pushConfig.Discovery,
MongodbConfigFileName: &pushConfig.MongodbConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)

View File

@ -0,0 +1,40 @@
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// GroupMuteDatabase per-user group notification mute.
type GroupMuteDatabase interface {
Upsert(ctx context.Context, mute *model.GroupMute) error
Delete(ctx context.Context, ownerUserID, groupID string) error
ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error)
Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error)
}
type groupMuteDatabase struct {
db database.GroupMute
}
func NewGroupMuteDatabase(db database.GroupMute) GroupMuteDatabase {
return &groupMuteDatabase{db: db}
}
func (g *groupMuteDatabase) Upsert(ctx context.Context, mute *model.GroupMute) error {
return g.db.Upsert(ctx, mute)
}
func (g *groupMuteDatabase) Delete(ctx context.Context, ownerUserID, groupID string) error {
return g.db.Delete(ctx, ownerUserID, groupID)
}
func (g *groupMuteDatabase) ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error) {
return g.db.ListActiveMutedUserIDs(ctx, groupID, candidateUserIDs)
}
func (g *groupMuteDatabase) Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error) {
return g.db.Get(ctx, ownerUserID, groupID)
}

View File

@ -0,0 +1,17 @@
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// GroupMute persists per-user group notification mute settings.
type GroupMute interface {
Upsert(ctx context.Context, mute *model.GroupMute) error
Delete(ctx context.Context, ownerUserID, groupID string) error
// ListActiveMutedUserIDs returns which of candidateUserIDs currently have an active mute on this group.
ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error)
// Get returns one document by owner + group; nil if not found.
Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error)
}

View File

@ -0,0 +1,109 @@
package mgo
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewGroupMuteMongo(db *mongo.Database) (database.GroupMute, error) {
coll := db.Collection(database.GroupMuteName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "owner_user_id", Value: 1},
{Key: "group_id", Value: 1},
},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
}
return &GroupMuteMgo{coll: coll}, nil
}
type GroupMuteMgo struct {
coll *mongo.Collection
}
func (g *GroupMuteMgo) Upsert(ctx context.Context, mute *model.GroupMute) error {
if mute.CreateTime.IsZero() {
mute.CreateTime = time.Now()
}
filter := bson.M{
"owner_user_id": mute.OwnerUserID,
"group_id": mute.GroupID,
}
update := bson.M{
"$set": bson.M{
"mute_end_time": mute.MuteEndTime,
"mute_duration": mute.MuteDuration,
},
"$setOnInsert": bson.M{
"owner_user_id": mute.OwnerUserID,
"group_id": mute.GroupID,
"create_time": mute.CreateTime,
},
}
_, err := g.coll.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true))
return errs.Wrap(err)
}
func (g *GroupMuteMgo) Delete(ctx context.Context, ownerUserID, groupID string) error {
_, err := g.coll.DeleteOne(ctx, bson.M{
"owner_user_id": ownerUserID,
"group_id": groupID,
})
return errs.Wrap(err)
}
func (g *GroupMuteMgo) ListActiveMutedUserIDs(ctx context.Context, groupID string, candidateUserIDs []string) ([]string, error) {
if len(candidateUserIDs) == 0 {
return nil, nil
}
now := time.Now().Unix()
filter := bson.M{
"group_id": groupID,
"owner_user_id": bson.M{"$in": candidateUserIDs},
"$or": bson.A{
bson.M{"mute_end_time": 0},
bson.M{"mute_end_time": bson.M{"$gt": now}},
},
}
cur, err := g.coll.Find(ctx, filter, options.Find().SetProjection(bson.M{"owner_user_id": 1, "_id": 0}))
if err != nil {
return nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var out []string
for cur.Next(ctx) {
var doc struct {
OwnerUserID string `bson:"owner_user_id"`
}
if err := cur.Decode(&doc); err != nil {
return nil, errs.Wrap(err)
}
out = append(out, doc.OwnerUserID)
}
return out, cur.Err()
}
func (g *GroupMuteMgo) Get(ctx context.Context, ownerUserID, groupID string) (*model.GroupMute, error) {
var out model.GroupMute
err := g.coll.FindOne(ctx, bson.M{
"owner_user_id": ownerUserID,
"group_id": groupID,
}).Decode(&out)
if err != nil {
if err == mongo.ErrNoDocuments {
return nil, nil
}
return nil, errs.Wrap(err)
}
return &out, nil
}

View File

@ -41,7 +41,8 @@ func (u *UserMuteMgo) Upsert(ctx context.Context, mute *model.UserMute) error {
}
update := bson.M{
"$set": bson.M{
"mute_end_time": mute.MuteEndTime,
"mute_end_time": mute.MuteEndTime,
"mute_duration": mute.MuteDuration,
},
"$setOnInsert": bson.M{
"owner_user_id": mute.OwnerUserID,

View File

@ -13,6 +13,7 @@ const (
ConversationVersionName = "conversation_version"
GroupRequestName = "group_request"
GroupPinnedMsgName = "group_pinned_msg"
GroupMuteName = "group_mute"
LogName = "log"
ObjectName = "s3"
UserName = "user"

View File

@ -0,0 +1,14 @@
package model
import "time"
// GroupMute is per-user mute of group message notifications (e.g. offline push).
// OwnerUserID must be a group member; MuteEndTime 0 means permanent.
// MuteDuration is the configured interval at set time: -1 permanent, >0 seconds.
type GroupMute struct {
OwnerUserID string `bson:"owner_user_id"`
GroupID string `bson:"group_id"`
MuteEndTime int64 `bson:"mute_end_time"` // Unix seconds; 0 = permanent
MuteDuration int64 `bson:"mute_duration"` // -1 permanent, >0 seconds
CreateTime time.Time `bson:"create_time"`
}

View File

@ -4,9 +4,11 @@ import "time"
// UserMute records a mute relationship: OwnerUserID has muted MutedUserID.
// Works for both friends and strangers. MuteEndTime == 0 means permanent mute.
// MuteDuration is the configured interval at set time: -1 permanent, >0 seconds (0 if unset / legacy).
type UserMute struct {
OwnerUserID string `bson:"owner_user_id"` // who set the mute
MutedUserID string `bson:"muted_user_id"` // who is muted
MuteEndTime int64 `bson:"mute_end_time"` // Unix seconds; 0 = permanent
CreateTime time.Time `bson:"create_time"`
OwnerUserID string `bson:"owner_user_id"` // who set the mute
MutedUserID string `bson:"muted_user_id"` // who is muted
MuteEndTime int64 `bson:"mute_end_time"` // Unix seconds; 0 = permanent
MuteDuration int64 `bson:"mute_duration"` // configured interval: -1 permanent, >0 seconds
CreateTime time.Time `bson:"create_time"`
}