mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-06-29 22:38:27 +08:00
Merge branch 'develop/tom' into develop/tom2
This commit is contained in:
commit
bac79841ac
@ -173,3 +173,15 @@ func (o *GroupApi) GetGroupApplicationUnhandledCount(c *gin.Context) {
|
||||
func (o *GroupApi) GetCommonGroupsWithFriend(c *gin.Context) {
|
||||
a2r.Call(c, group.GroupClient.GetCommonGroupsWithFriend, o.Client)
|
||||
}
|
||||
|
||||
func (o *GroupApi) PinGroupMessage(c *gin.Context) {
|
||||
a2r.Call(c, group.GroupClient.PinGroupMessage, o.Client)
|
||||
}
|
||||
|
||||
func (o *GroupApi) UnpinGroupMessage(c *gin.Context) {
|
||||
a2r.Call(c, group.GroupClient.UnpinGroupMessage, o.Client)
|
||||
}
|
||||
|
||||
func (o *GroupApi) GetGroupPinnedMessages(c *gin.Context) {
|
||||
a2r.Call(c, group.GroupClient.GetGroupPinnedMessages, o.Client)
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
pbcrypto "github.com/openimsdk/protocol/crypto"
|
||||
"github.com/openimsdk/protocol/group"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
pbredpacket "github.com/openimsdk/protocol/redpacket"
|
||||
"github.com/openimsdk/protocol/relation"
|
||||
"github.com/openimsdk/protocol/rtc"
|
||||
"github.com/openimsdk/protocol/third"
|
||||
@ -117,6 +118,10 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
redpacketConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.RedPacket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
r := gin.New()
|
||||
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
|
||||
@ -364,6 +369,30 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
|
||||
cryptoGroup.POST("/integrity_report", cr.IntegrityReport)
|
||||
}
|
||||
|
||||
// RedPacket
|
||||
{
|
||||
rp := NewRedPacketApi(pbredpacket.NewRedPacketClient(redpacketConn))
|
||||
redpacketGroup := r.Group("/redpacket")
|
||||
redpacketGroup.POST("/create_order", rp.CreateOrder)
|
||||
redpacketGroup.POST("/created_callback", rp.CreatedCallback)
|
||||
redpacketGroup.POST("/detail", rp.GetDetail)
|
||||
redpacketGroup.POST("/issue_claim_sign", rp.IssueClaimSign)
|
||||
redpacketGroup.POST("/claim_result", rp.ClaimResult)
|
||||
redpacketGroup.POST("/request_refund", rp.RequestRefund)
|
||||
redpacketGroup.POST("/get_refund", rp.GetRefund)
|
||||
redpacketGroup.POST("/wallet_bind/challenge", rp.IssueWalletBindChallenge)
|
||||
redpacketGroup.POST("/wallet_bind/confirm", rp.ConfirmWalletBind)
|
||||
redpacketGroup.POST("/wallet_bind/detail", rp.GetWalletBinding)
|
||||
|
||||
adminGroup := redpacketGroup.Group("/admin")
|
||||
adminGroup.POST("/set_signer", rp.AdminSetSigner)
|
||||
adminGroup.POST("/set_token", rp.AdminSetToken)
|
||||
adminGroup.POST("/set_expiry", rp.AdminSetExpiry)
|
||||
adminGroup.POST("/set_allow_all_tokens", rp.AdminSetAllowAllTokens)
|
||||
adminGroup.POST("/set_native_token_enabled", rp.AdminSetNativeTokenEnabled)
|
||||
adminGroup.POST("/parse_tx_events", rp.AdminParseTxEvents)
|
||||
}
|
||||
|
||||
{
|
||||
statisticsGroup := r.Group("/statistics")
|
||||
statisticsGroup.POST("/user/register", u.UserRegisterCount)
|
||||
|
||||
@ -29,10 +29,14 @@ func NewUserGlobalBlackApi(blacklistDB controller.UserGlobalBlackDatabase, userD
|
||||
type addGlobalBlacklistReq struct {
|
||||
UserIDs []string `json:"userIDs" binding:"required,min=1"`
|
||||
Reason string `json:"reason"`
|
||||
// Status 限制类型:1=冻结(可登录,不能收发消息);2=黑名单(不可登录,自动踢下线)
|
||||
Status int32 `json:"status" binding:"required,oneof=1 2"`
|
||||
}
|
||||
|
||||
type removeGlobalBlacklistReq struct {
|
||||
UserIDs []string `json:"userIDs" binding:"required,min=1"`
|
||||
// Status 目标状态:0=恢复正常(同步从 blacklistDB 删除记录);1=冻结;2=黑名单
|
||||
Status int32 `json:"status" binding:"oneof=0 1 2"`
|
||||
}
|
||||
|
||||
type getGlobalBlacklistReq struct {
|
||||
@ -45,6 +49,8 @@ type globalBlackItem struct {
|
||||
OperatorID string `json:"operatorID"`
|
||||
Reason string `json:"reason"`
|
||||
CreateTime int64 `json:"createTime"`
|
||||
// Status 限制类型:1=冻结,2=黑名单
|
||||
Status int32 `json:"status"`
|
||||
}
|
||||
|
||||
type getGlobalBlacklistResp struct {
|
||||
@ -52,7 +58,8 @@ type getGlobalBlacklistResp struct {
|
||||
Blacks []globalBlackItem `json:"blacks"`
|
||||
}
|
||||
|
||||
// AddGlobalBlacklist 管理员将用户加入全局黑名单,并立即踢下线(所有平台 token 标记 KickedToken)
|
||||
// AddGlobalBlacklist 管理员设置用户限制状态。
|
||||
// Status=1(冻结):可登录,但不能收发消息;Status=2(黑名单):不可登录,自动踢下线,不能收发消息。
|
||||
func (b *UserGlobalBlackApi) AddGlobalBlacklist(c *gin.Context) {
|
||||
var req addGlobalBlacklistReq
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
@ -85,31 +92,44 @@ func (b *UserGlobalBlackApi) AddGlobalBlacklist(c *gin.Context) {
|
||||
Nickname: u.Nickname,
|
||||
OperatorID: operatorID,
|
||||
Reason: req.Reason,
|
||||
Status: req.Status,
|
||||
})
|
||||
}
|
||||
if err := b.blacklistDB.AddBlack(c, blacks); err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
// 黑名单写入成功后,对每个被封禁用户的所有非管理员平台执行 force_logout:
|
||||
// 1. 断开 WS 长连接(msggateway.KickUserOffline)
|
||||
// 2. 将 Redis 中该平台的所有 token 标记为 KickedToken
|
||||
for _, black := range blacks {
|
||||
for platformID := range constant.PlatformID2Name {
|
||||
if int32(platformID) == constant.AdminPlatformID {
|
||||
continue
|
||||
}
|
||||
if err := b.authClient.ForceLogout(c, black.UserID, int32(platformID)); err != nil {
|
||||
// 踢下线失败不阻断主流程,记录警告即可
|
||||
log.ZWarn(c, "AddGlobalBlacklist: ForceLogout failed", err,
|
||||
"userID", black.UserID, "platformID", platformID)
|
||||
// 同步更新 user 集合中的状态字段
|
||||
for _, userID := range req.UserIDs {
|
||||
if err := b.userDB.UpdateByMap(c, userID, map[string]any{"status": req.Status}); err != nil {
|
||||
log.ZWarn(c, "AddGlobalBlacklist: UpdateByMap status failed", err,
|
||||
"userID", userID, "status", req.Status)
|
||||
}
|
||||
}
|
||||
// 仅黑名单(Status=2)需要踢下线:断开 WS 长连接并将 token 标记为 KickedToken
|
||||
if req.Status == model.UserStatusBlacklist {
|
||||
for _, black := range blacks {
|
||||
for platformID := range constant.PlatformID2Name {
|
||||
if int32(platformID) == constant.AdminPlatformID {
|
||||
continue
|
||||
}
|
||||
if err := b.authClient.ForceLogout(c, black.UserID, int32(platformID)); err != nil {
|
||||
log.ZWarn(c, "AddGlobalBlacklist: ForceLogout failed", err,
|
||||
"userID", black.UserID, "platformID", platformID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
apiresp.GinSuccess(c, nil)
|
||||
}
|
||||
|
||||
// RemoveGlobalBlacklist 管理员从全局黑名单移除用户
|
||||
// RemoveGlobalBlacklist 管理员更新用户账号状态。
|
||||
// 执行顺序:
|
||||
// 1. 将 user 集合中的 status 字段更新为请求值
|
||||
// 2. 仅当 status == 0(恢复正常)时,才从 blacklistDB 删除该用户的限制记录
|
||||
//
|
||||
// 说明:blacklistDB 是 auth/msg 层的拦截依据;状态先落 user 集合,
|
||||
// 只有确认目标状态为"正常"时才清除黑名单记录,避免状态写入成功但记录未删导致仍被拦截。
|
||||
func (b *UserGlobalBlackApi) RemoveGlobalBlacklist(c *gin.Context) {
|
||||
var req removeGlobalBlacklistReq
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
@ -120,9 +140,19 @@ func (b *UserGlobalBlackApi) RemoveGlobalBlacklist(c *gin.Context) {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
if err := b.blacklistDB.RemoveBlack(c, req.UserIDs); err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
for _, userID := range req.UserIDs {
|
||||
if err := b.userDB.UpdateByMap(c, userID, map[string]any{"status": req.Status}); err != nil {
|
||||
log.ZError(c, "RemoveGlobalBlacklist: UpdateByMap status failed", err, "userID", userID, "status", req.Status)
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
// 只有目标状态为 0(正常)时才删除 blacklistDB 中的限制记录
|
||||
if req.Status == model.UserStatusNormal {
|
||||
if err := b.blacklistDB.RemoveBlack(c, req.UserIDs); err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
apiresp.GinSuccess(c, nil)
|
||||
}
|
||||
@ -151,6 +181,7 @@ func (b *UserGlobalBlackApi) GetGlobalBlacklist(c *gin.Context) {
|
||||
OperatorID: blk.OperatorID,
|
||||
Reason: blk.Reason,
|
||||
CreateTime: blk.CreateTime.UnixMilli(),
|
||||
Status: blk.Status,
|
||||
})
|
||||
}
|
||||
apiresp.GinSuccess(c, getGlobalBlacklistResp{Total: total, Blacks: items})
|
||||
|
||||
@ -32,6 +32,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
pbauth "github.com/openimsdk/protocol/auth"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/msggateway"
|
||||
@ -140,13 +141,13 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
|
||||
return nil, errs.ErrArgs.WrapMsg("app account can`t get token")
|
||||
}
|
||||
|
||||
blocked, _ := s.blacklistDB.IsBlocked(ctx, req.UserID)
|
||||
if blocked {
|
||||
// Blacklisted users should be actively kicked to invalidate existing sessions.
|
||||
// 仅黑名单(status=2)禁止登录;冻结(status=1)允许获取 token,仅在收发消息层面拦截
|
||||
status, _ := s.blacklistDB.GetStatus(ctx, req.UserID)
|
||||
if status == model.UserStatusBlacklist {
|
||||
if kickErr := s.forceKickOffAllPlatforms(ctx, req.UserID); kickErr != nil {
|
||||
log.ZWarn(ctx, "GetUserToken forceKickOffAllPlatforms failed", kickErr, "userID", req.UserID)
|
||||
}
|
||||
log.ZWarn(ctx, "GetUserToken is blocked", errors.New("user is in global blacklist, userID="+req.UserID), "userID", req.UserID, "blocked", blocked)
|
||||
log.ZWarn(ctx, "GetUserToken is blocked", errors.New("user is in global blacklist, userID="+req.UserID), "userID", req.UserID, "status", status)
|
||||
return nil, servererrs.ErrUserBlocked.WithDetail("user is in global blacklist, userID=" + req.UserID)
|
||||
}
|
||||
token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID))
|
||||
@ -167,14 +168,13 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim
|
||||
if isAdmin {
|
||||
return claims, nil
|
||||
}
|
||||
// 非管理员用户检查全局黑名单
|
||||
blocked, _ := s.blacklistDB.IsBlocked(ctx, claims.UserID)
|
||||
if blocked {
|
||||
// Blacklisted users should be actively kicked to invalidate existing sessions.
|
||||
// 非管理员用户检查全局黑名单:仅 status=2(黑名单)拦截;status=1(冻结)允许通过 token 校验
|
||||
status, _ := s.blacklistDB.GetStatus(ctx, claims.UserID)
|
||||
if status == model.UserStatusBlacklist {
|
||||
if kickErr := s.forceKickOffAllPlatforms(ctx, claims.UserID); kickErr != nil {
|
||||
log.ZWarn(ctx, "parseToken forceKickOffAllPlatforms failed", kickErr, "userID", claims.UserID)
|
||||
}
|
||||
log.ZWarn(ctx, "parseToken is blocked", errors.New("user is in global blacklist, userID="+claims.UserID), "userID", claims.UserID, "blocked", blocked)
|
||||
log.ZWarn(ctx, "parseToken is blocked", errors.New("user is in global blacklist, userID="+claims.UserID), "userID", claims.UserID, "status", status)
|
||||
return nil, servererrs.ErrUserBlocked.WithDetail("user is in global blacklist, userID=" + claims.UserID)
|
||||
}
|
||||
m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID)
|
||||
|
||||
@ -101,6 +101,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
groupPinnedMsgDB, err := mgo.NewGroupPinnedMsgMongo(mgocli.GetDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
|
||||
//msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
|
||||
@ -130,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
conversationClient: rpcli.NewConversationClient(conversationConn),
|
||||
//cryptoClient: rpcli.NewCryptoClient(cryptoConn),
|
||||
}
|
||||
gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
|
||||
gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, groupPinnedMsgDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
|
||||
gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient)
|
||||
localcache.InitLocalCache(&config.LocalCacheConfig)
|
||||
pbgroup.RegisterGroupServer(server, &gs)
|
||||
|
||||
@ -852,6 +852,32 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte
|
||||
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips)
|
||||
}
|
||||
|
||||
// GroupMessagePinnedNotification 通知群成员有消息被置顶或取消置顶
|
||||
// pinType: 1=置顶, 2=取消置顶
|
||||
func (g *NotificationSender) GroupMessagePinnedNotification(ctx context.Context, groupID string, pinType int32,
|
||||
pinned *sdkws.GroupPinnedMsgInfo, pinnedList []*sdkws.GroupPinnedMsgInfo) {
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
groupInfo, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tips := &sdkws.GroupMessagePinnedTips{
|
||||
Group: groupInfo,
|
||||
Type: pinType,
|
||||
PinnedMsg: pinned,
|
||||
PinnedList: pinnedList,
|
||||
}
|
||||
if err = g.fillOpUser(ctx, &tips.OpUser, groupID); err != nil {
|
||||
return
|
||||
}
|
||||
g.Notification(ctx, mcontext.GetOpUserID(ctx), groupID, constant.GroupMessagePinnedNotification, tips)
|
||||
}
|
||||
|
||||
func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) {
|
||||
var err error
|
||||
defer func() {
|
||||
|
||||
271
internal/rpc/group/pinned_msg.go
Normal file
271
internal/rpc/group/pinned_msg.go
Normal file
@ -0,0 +1,271 @@
|
||||
// Copyright © 2026 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
||||
package group
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbgroup "github.com/openimsdk/protocol/group"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
)
|
||||
|
||||
// 群置顶消息相关 RPC 实现:
|
||||
// - 自动滚动保留最近 N 条置顶消息(N=model.GroupPinnedMsgMaxKeep,默认为 3)
|
||||
// - 置顶时把整条消息内容做完整快照存档,避免后续消息删除/撤回影响展示
|
||||
// - 每条置顶记录拥有唯一 pinID,作为 unpin 时的精准删除凭据
|
||||
// - 权限:默认全员可置顶;当 group.AllowPinMsg=1 时,仅群主/管理员可置顶或取消置顶
|
||||
|
||||
const (
|
||||
groupPinnedActionPin = int32(1)
|
||||
groupPinnedActionUnpin = int32(2)
|
||||
)
|
||||
|
||||
// PinGroupMessage 群聊中置顶单条消息
|
||||
func (s *groupServer) PinGroupMessage(ctx context.Context, req *pbgroup.PinGroupMessageReq) (*pbgroup.PinGroupMessageResp, error) {
|
||||
if req.GroupID == "" {
|
||||
return nil, errs.ErrArgs.WrapMsg("groupID empty")
|
||||
}
|
||||
if req.Seq <= 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("seq must be positive")
|
||||
}
|
||||
|
||||
group, err := s.db.TakeGroup(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if group.Status == constant.GroupStatusDismissed {
|
||||
return nil, servererrs.ErrDismissedAlready.Wrap()
|
||||
}
|
||||
|
||||
if err := s.checkPinPermission(ctx, group); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
|
||||
msgData, err := s.msgClient.GetSingleMsgBySeq(ctx, conversationID, req.Seq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msgData == nil {
|
||||
return nil, servererrs.ErrRecordNotFound.WrapMsg("message not found by seq")
|
||||
}
|
||||
if msgData.GroupID != "" && msgData.GroupID != req.GroupID {
|
||||
return nil, errs.ErrArgs.WrapMsg("seq does not belong to this group")
|
||||
}
|
||||
if msgData.Status >= constant.MsgStatusHasDeleted {
|
||||
return nil, servererrs.ErrRecordNotFound.WrapMsg("message has been deleted")
|
||||
}
|
||||
|
||||
pin := buildPinSnapshot(req.GroupID, conversationID, mcontext.GetOpUserID(ctx), msgData)
|
||||
|
||||
pinnedList, err := s.db.PinGroupMessage(ctx, req.GroupID, pin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pbPinned := pinnedMsgDB2PB(pin)
|
||||
pbList := pinnedListDB2PB(pinnedList)
|
||||
|
||||
s.notification.GroupMessagePinnedNotification(ctx, req.GroupID, groupPinnedActionPin, pbPinned, pbList)
|
||||
|
||||
return &pbgroup.PinGroupMessageResp{
|
||||
PinnedMsg: pbPinned,
|
||||
PinnedList: pbList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UnpinGroupMessage 群聊中取消置顶单条消息(pinID 优先;为空则按 seq)
|
||||
func (s *groupServer) UnpinGroupMessage(ctx context.Context, req *pbgroup.UnpinGroupMessageReq) (*pbgroup.UnpinGroupMessageResp, error) {
|
||||
if req.GroupID == "" {
|
||||
return nil, errs.ErrArgs.WrapMsg("groupID empty")
|
||||
}
|
||||
if req.PinID == "" && req.Seq <= 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("either pinID or seq must be provided")
|
||||
}
|
||||
|
||||
group, err := s.db.TakeGroup(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if group.Status == constant.GroupStatusDismissed {
|
||||
return nil, servererrs.ErrDismissedAlready.Wrap()
|
||||
}
|
||||
if err := s.checkPinPermission(ctx, group); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
current, err := s.db.GetGroupPinnedMessages(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var target *model.GroupPinnedMessage
|
||||
for _, m := range current {
|
||||
if req.PinID != "" {
|
||||
if m.PinID == req.PinID {
|
||||
target = m
|
||||
break
|
||||
}
|
||||
} else if m.Seq == req.Seq {
|
||||
target = m
|
||||
break
|
||||
}
|
||||
}
|
||||
if target == nil {
|
||||
return nil, servererrs.ErrRecordNotFound.WrapMsg("pinned message not found")
|
||||
}
|
||||
|
||||
pinnedList, err := s.db.UnpinGroupMessage(ctx, req.GroupID, req.PinID, req.Seq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pbPinned := pinnedMsgDB2PB(target)
|
||||
pbList := pinnedListDB2PB(pinnedList)
|
||||
|
||||
s.notification.GroupMessagePinnedNotification(ctx, req.GroupID, groupPinnedActionUnpin, pbPinned, pbList)
|
||||
|
||||
return &pbgroup.UnpinGroupMessageResp{PinnedList: pbList}, nil
|
||||
}
|
||||
|
||||
// GetGroupPinnedMessages 获取群置顶消息列表
|
||||
func (s *groupServer) GetGroupPinnedMessages(ctx context.Context, req *pbgroup.GetGroupPinnedMessagesReq) (*pbgroup.GetGroupPinnedMessagesResp, error) {
|
||||
if req.GroupID == "" {
|
||||
return nil, errs.ErrArgs.WrapMsg("groupID empty")
|
||||
}
|
||||
if err := s.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pinnedList, err := s.db.GetGroupPinnedMessages(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbgroup.GetGroupPinnedMessagesResp{
|
||||
PinnedList: pinnedListDB2PB(pinnedList),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// checkPinPermission 校验当前操作者是否具备群消息置顶权限
|
||||
func (s *groupServer) checkPinPermission(ctx context.Context, group *model.Group) error {
|
||||
if authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
|
||||
return nil
|
||||
}
|
||||
opUserID := mcontext.GetOpUserID(ctx)
|
||||
if opUserID == "" {
|
||||
return errs.ErrNoPermission.WrapMsg("op user id empty")
|
||||
}
|
||||
member, err := s.db.TakeGroupMember(ctx, group.GroupID, opUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
isOwnerOrAdmin := member.RoleLevel == constant.GroupOwner || member.RoleLevel == constant.GroupAdmin
|
||||
if group.AllowPinMsg == model.GroupPermAdminOnly && !isOwnerOrAdmin {
|
||||
return errs.ErrNoPermission.WrapMsg("only owner or admin can pin/unpin group message")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildPinSnapshot 把 sdkws.MsgData 完整快照成 GroupPinnedMessage
|
||||
// PinID 在 mgo 层 Pin 时若为空会自动生成;这里留空交由存储层处理
|
||||
func buildPinSnapshot(groupID, conversationID, opUserID string, m *sdkws.MsgData) *model.GroupPinnedMessage {
|
||||
pin := &model.GroupPinnedMessage{
|
||||
GroupID: groupID,
|
||||
ConversationID: conversationID,
|
||||
Seq: m.Seq,
|
||||
ServerMsgID: m.ServerMsgID,
|
||||
ClientMsgID: m.ClientMsgID,
|
||||
SendID: m.SendID,
|
||||
RecvID: m.RecvID,
|
||||
SenderPlatformID: m.SenderPlatformID,
|
||||
SenderNickname: m.SenderNickname,
|
||||
SenderFaceURL: m.SenderFaceURL,
|
||||
SessionType: m.SessionType,
|
||||
MsgFrom: m.MsgFrom,
|
||||
ContentType: m.ContentType,
|
||||
Content: string(m.Content),
|
||||
AtUserIDList: append([]string(nil), m.AtUserIDList...),
|
||||
Options: copyOptions(m.Options),
|
||||
AttachedInfo: m.AttachedInfo,
|
||||
Ex: m.Ex,
|
||||
SendTime: m.SendTime,
|
||||
CreateTime: m.CreateTime,
|
||||
Status: m.Status,
|
||||
PinUserID: opUserID,
|
||||
PinTime: time.Now().UnixMilli(),
|
||||
}
|
||||
if m.OfflinePushInfo != nil {
|
||||
pin.OfflinePush = &model.GroupPinnedOfflinePush{
|
||||
Title: m.OfflinePushInfo.Title,
|
||||
Desc: m.OfflinePushInfo.Desc,
|
||||
Ex: m.OfflinePushInfo.Ex,
|
||||
IOSPushSound: m.OfflinePushInfo.IOSPushSound,
|
||||
IOSBadgeCount: m.OfflinePushInfo.IOSBadgeCount,
|
||||
SignalInfo: m.OfflinePushInfo.SignalInfo,
|
||||
}
|
||||
}
|
||||
return pin
|
||||
}
|
||||
|
||||
func copyOptions(src map[string]bool) map[string]bool {
|
||||
if len(src) == 0 {
|
||||
return nil
|
||||
}
|
||||
dst := make(map[string]bool, len(src))
|
||||
for k, v := range src {
|
||||
dst[k] = v
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func pinnedMsgDB2PB(m *model.GroupPinnedMessage) *sdkws.GroupPinnedMsgInfo {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
return &sdkws.GroupPinnedMsgInfo{
|
||||
PinID: m.PinID,
|
||||
GroupID: m.GroupID,
|
||||
ConversationID: m.ConversationID,
|
||||
Seq: m.Seq,
|
||||
ServerMsgID: m.ServerMsgID,
|
||||
ClientMsgID: m.ClientMsgID,
|
||||
SendID: m.SendID,
|
||||
RecvID: m.RecvID,
|
||||
SenderPlatformID: m.SenderPlatformID,
|
||||
SenderNickname: m.SenderNickname,
|
||||
SenderFaceURL: m.SenderFaceURL,
|
||||
SessionType: m.SessionType,
|
||||
MsgFrom: m.MsgFrom,
|
||||
ContentType: m.ContentType,
|
||||
Content: m.Content,
|
||||
AtUserIDList: append([]string(nil), m.AtUserIDList...),
|
||||
Options: copyOptions(m.Options),
|
||||
AttachedInfo: m.AttachedInfo,
|
||||
Ex: m.Ex,
|
||||
SendTime: m.SendTime,
|
||||
CreateTime: m.CreateTime,
|
||||
Status: m.Status,
|
||||
PinUserID: m.PinUserID,
|
||||
PinTime: m.PinTime,
|
||||
}
|
||||
}
|
||||
|
||||
func pinnedListDB2PB(list []*model.GroupPinnedMessage) []*sdkws.GroupPinnedMsgInfo {
|
||||
if len(list) == 0 {
|
||||
return nil
|
||||
}
|
||||
result := make([]*sdkws.GroupPinnedMsgInfo, 0, len(list))
|
||||
for _, m := range list {
|
||||
result = append(result, pinnedMsgDB2PB(m))
|
||||
}
|
||||
return result
|
||||
}
|
||||
@ -34,6 +34,10 @@ import (
|
||||
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
||||
if req.MsgData != nil {
|
||||
m.encapsulateMsgData(req.MsgData)
|
||||
// 全局账号状态校验:冻结/黑名单用户不可收发消息
|
||||
if err := m.verifyUserStatus(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch req.MsgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
return m.sendMsgSingleChat(ctx, req)
|
||||
|
||||
@ -71,6 +71,7 @@ type msgServer struct {
|
||||
webhookClient *webhook.Client
|
||||
conversationClient *rpcli.ConversationClient
|
||||
spamReportDB database.SpamReport
|
||||
globalBlackDB controller.UserGlobalBlackDatabase
|
||||
}
|
||||
|
||||
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
|
||||
@ -127,6 +128,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
globalBlackMgo, err := mgo.NewUserGlobalBlackMongo(mgocli.GetDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s := &msgServer{
|
||||
MsgDatabase: msgDatabase,
|
||||
RegisterCenter: client,
|
||||
@ -138,6 +143,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||
conversationClient: conversationClient,
|
||||
spamReportDB: spamReportDB,
|
||||
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
|
||||
}
|
||||
|
||||
s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg))
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
@ -50,6 +51,43 @@ type MessageRevoked struct {
|
||||
Seq uint32 `json:"seq"`
|
||||
}
|
||||
|
||||
// verifyUserStatus 校验发送方/接收方的全局账号状态。
|
||||
// 任意一方处于冻结(1)或黑名单(2)即拒绝消息发送/投递。
|
||||
// 通知类消息(NotificationBegin~NotificationEnd)和管理员发送方放行。
|
||||
func (m *msgServer) verifyUserStatus(ctx context.Context, data *msg.SendMsgReq) error {
|
||||
if data == nil || data.MsgData == nil {
|
||||
return nil
|
||||
}
|
||||
if data.MsgData.ContentType >= constant.NotificationBegin && data.MsgData.ContentType <= constant.NotificationEnd {
|
||||
return nil
|
||||
}
|
||||
sendID := data.MsgData.SendID
|
||||
if datautil.Contain(sendID, m.config.Share.IMAdminUserID...) {
|
||||
return nil
|
||||
}
|
||||
if sendID != "" {
|
||||
st, err := m.globalBlackDB.GetStatus(ctx, sendID)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "verifyUserStatus: GetStatus(send) failed", err, "sendID", sendID)
|
||||
} else if st == model.UserStatusFrozen || st == model.UserStatusBlacklist {
|
||||
return servererrs.ErrUserBlocked.WithDetail("sender is restricted, status=" + strconv.Itoa(int(st)))
|
||||
}
|
||||
}
|
||||
// 单聊:同时校验接收方状态;群聊接收方拦截在推送层处理
|
||||
if data.MsgData.SessionType == constant.SingleChatType {
|
||||
recvID := data.MsgData.RecvID
|
||||
if recvID != "" && !datautil.Contain(recvID, m.config.Share.IMAdminUserID...) {
|
||||
st, err := m.globalBlackDB.GetStatus(ctx, recvID)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "verifyUserStatus: GetStatus(recv) failed", err, "recvID", recvID)
|
||||
} else if st == model.UserStatusFrozen || st == model.UserStatusBlacklist {
|
||||
return servererrs.ErrMsgReceiveNotAllowed.WrapMsg("receiver is restricted")
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error {
|
||||
switch data.MsgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
|
||||
@ -697,7 +697,7 @@ func (s *friendServer) AddOnewayFriend(ctx context.Context, req *relation.ApplyT
|
||||
if in1 {
|
||||
return nil, servererrs.ErrRelationshipAlready.WrapMsg("already in friend list")
|
||||
}
|
||||
if err := s.db.BecomeOnewayFriend(ctx, req.FromUserID, req.ToUserID, becomeFriendByOneway, req.Remark); err != nil {
|
||||
if err := s.db.BecomeOnewayFriend(ctx, req.FromUserID, req.ToUserID, becomeFriendByOneway); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -126,6 +126,11 @@ type GroupDatabase interface {
|
||||
FindJoinGroupID(ctx context.Context, userID string) ([]string, error)
|
||||
|
||||
GetGroupApplicationUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error)
|
||||
|
||||
// 群置顶消息:保留最近 N 条
|
||||
PinGroupMessage(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error)
|
||||
UnpinGroupMessage(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error)
|
||||
GetGroupPinnedMessages(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error)
|
||||
}
|
||||
|
||||
func NewGroupDatabase(
|
||||
@ -134,24 +139,39 @@ func NewGroupDatabase(
|
||||
groupDB database.Group,
|
||||
groupMemberDB database.GroupMember,
|
||||
groupRequestDB database.GroupRequest,
|
||||
groupPinnedMsgDB database.GroupPinnedMsg,
|
||||
ctxTx tx.Tx,
|
||||
groupHash cache.GroupHash,
|
||||
) GroupDatabase {
|
||||
return &groupDatabase{
|
||||
groupDB: groupDB,
|
||||
groupMemberDB: groupMemberDB,
|
||||
groupRequestDB: groupRequestDB,
|
||||
ctxTx: ctxTx,
|
||||
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
|
||||
groupDB: groupDB,
|
||||
groupMemberDB: groupMemberDB,
|
||||
groupRequestDB: groupRequestDB,
|
||||
groupPinnedMsgDB: groupPinnedMsgDB,
|
||||
ctxTx: ctxTx,
|
||||
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
|
||||
}
|
||||
}
|
||||
|
||||
type groupDatabase struct {
|
||||
groupDB database.Group
|
||||
groupMemberDB database.GroupMember
|
||||
groupRequestDB database.GroupRequest
|
||||
ctxTx tx.Tx
|
||||
cache cache.GroupCache
|
||||
groupDB database.Group
|
||||
groupMemberDB database.GroupMember
|
||||
groupRequestDB database.GroupRequest
|
||||
groupPinnedMsgDB database.GroupPinnedMsg
|
||||
ctxTx tx.Tx
|
||||
cache cache.GroupCache
|
||||
}
|
||||
|
||||
func (g *groupDatabase) PinGroupMessage(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) {
|
||||
return g.groupPinnedMsgDB.Pin(ctx, groupID, msg)
|
||||
}
|
||||
|
||||
func (g *groupDatabase) UnpinGroupMessage(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) {
|
||||
return g.groupPinnedMsgDB.Unpin(ctx, groupID, pinID, seq)
|
||||
}
|
||||
|
||||
func (g *groupDatabase) GetGroupPinnedMessages(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) {
|
||||
return g.groupPinnedMsgDB.Get(ctx, groupID)
|
||||
}
|
||||
|
||||
func (g *groupDatabase) FindJoinGroupID(ctx context.Context, userID string) ([]string, error) {
|
||||
|
||||
@ -14,8 +14,10 @@ type UserGlobalBlackDatabase interface {
|
||||
AddBlack(ctx context.Context, blacks []*model.UserGlobalBlack) error
|
||||
// RemoveBlack 按 userID 将用户从全局黑名单移除
|
||||
RemoveBlack(ctx context.Context, userIDs []string) error
|
||||
// IsBlocked 检查用户是否在全局黑名单
|
||||
// IsBlocked 检查用户是否在全局黑名单(含冻结)
|
||||
IsBlocked(ctx context.Context, userID string) (bool, error)
|
||||
// GetStatus 返回用户限制状态:0=正常,1=冻结,2=黑名单
|
||||
GetStatus(ctx context.Context, userID string) (int32, error)
|
||||
// FindBlocked 批量查询哪些 userID 在全局黑名单中,返回被封禁的记录
|
||||
FindBlocked(ctx context.Context, userIDs []string) ([]*model.UserGlobalBlack, error)
|
||||
// GetBlackList 分页获取黑名单列表
|
||||
@ -42,6 +44,10 @@ func (u *userGlobalBlackDatabase) IsBlocked(ctx context.Context, userID string)
|
||||
return u.db.IsBlocked(ctx, userID)
|
||||
}
|
||||
|
||||
func (u *userGlobalBlackDatabase) GetStatus(ctx context.Context, userID string) (int32, error) {
|
||||
return u.db.GetStatus(ctx, userID)
|
||||
}
|
||||
|
||||
func (u *userGlobalBlackDatabase) GetBlackList(ctx context.Context, pagination pagination.Pagination) (int64, []*model.UserGlobalBlack, error) {
|
||||
return u.db.Page(ctx, pagination)
|
||||
}
|
||||
|
||||
22
pkg/common/storage/database/group_pinned_msg.go
Normal file
22
pkg/common/storage/database/group_pinned_msg.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright © 2026 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
)
|
||||
|
||||
// GroupPinnedMsg 群置顶消息的存储抽象
|
||||
type GroupPinnedMsg interface {
|
||||
// Pin 置顶一条消息:若 PinID 为空会自动生成;自动滚动保留最近 N 条
|
||||
Pin(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error)
|
||||
// Unpin 取消置顶;pinID 非空时按 pinID 精确删除,否则按 seq 删除
|
||||
Unpin(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error)
|
||||
// Get 获取群置顶消息列表(最新的在前)
|
||||
Get(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error)
|
||||
}
|
||||
115
pkg/common/storage/database/mgo/group_pinned_msg.go
Normal file
115
pkg/common/storage/database/mgo/group_pinned_msg.go
Normal file
@ -0,0 +1,115 @@
|
||||
// Copyright © 2026 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
func NewGroupPinnedMsgMongo(db *mongo.Database) (database.GroupPinnedMsg, error) {
|
||||
coll := db.Collection(database.GroupPinnedMsgName)
|
||||
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
|
||||
Keys: bson.D{{Key: "group_id", Value: 1}},
|
||||
Options: options.Index().SetUnique(true),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return &groupPinnedMsgMgo{coll: coll}, nil
|
||||
}
|
||||
|
||||
type groupPinnedMsgMgo struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (g *groupPinnedMsgMgo) get(ctx context.Context, groupID string) (*model.GroupPinnedMsg, error) {
|
||||
doc, err := mongoutil.FindOne[*model.GroupPinnedMsg](ctx, g.coll, bson.M{"group_id": groupID})
|
||||
if err != nil {
|
||||
if errs.ErrRecordNotFound.Is(err) || errors.Is(err, mongo.ErrNoDocuments) {
|
||||
return &model.GroupPinnedMsg{GroupID: groupID}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
func (g *groupPinnedMsgMgo) Get(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) {
|
||||
doc, err := g.get(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return doc.PinnedMsgs, nil
|
||||
}
|
||||
|
||||
// Pin 置顶一条消息:
|
||||
// - 若提供的 msg.PinID 为空,则自动生成 ObjectID().Hex()
|
||||
// - 同 seq 的旧记录会被先移除避免重复
|
||||
// - 新记录 push 到数组首位,自动滚动保留最近 GroupPinnedMsgMaxKeep 条
|
||||
func (g *groupPinnedMsgMgo) Pin(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) {
|
||||
if msg == nil {
|
||||
return nil, errs.ErrArgs.WrapMsg("pin msg is nil")
|
||||
}
|
||||
if msg.PinID == "" {
|
||||
msg.PinID = primitive.NewObjectID().Hex()
|
||||
}
|
||||
msg.GroupID = groupID
|
||||
|
||||
if _, err := mongoutil.UpdateOneResult(ctx, g.coll,
|
||||
bson.M{"group_id": groupID},
|
||||
bson.M{"$pull": bson.M{"pinned_msgs": bson.M{"seq": msg.Seq}}},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filter := bson.M{"group_id": groupID}
|
||||
update := bson.M{
|
||||
"$push": bson.M{
|
||||
"pinned_msgs": bson.M{
|
||||
"$each": bson.A{msg},
|
||||
"$position": 0,
|
||||
"$slice": model.GroupPinnedMsgMaxKeep,
|
||||
},
|
||||
},
|
||||
"$setOnInsert": bson.M{"group_id": groupID},
|
||||
}
|
||||
opts := options.Update().SetUpsert(true)
|
||||
if _, err := g.coll.UpdateOne(ctx, filter, update, opts); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return g.Get(ctx, groupID)
|
||||
}
|
||||
|
||||
// Unpin 取消置顶:
|
||||
// - pinID 非空时按 pinID 精确删除(推荐)
|
||||
// - 否则按 seq 删除
|
||||
// 返回更新后的置顶列表(可能为空数组)
|
||||
func (g *groupPinnedMsgMgo) Unpin(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) {
|
||||
if pinID == "" && seq <= 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("either pinID or seq must be provided")
|
||||
}
|
||||
pull := bson.M{}
|
||||
if pinID != "" {
|
||||
pull["pin_id"] = pinID
|
||||
} else {
|
||||
pull["seq"] = seq
|
||||
}
|
||||
if _, err := mongoutil.UpdateOneResult(ctx, g.coll,
|
||||
bson.M{"group_id": groupID},
|
||||
bson.M{"$pull": bson.M{"pinned_msgs": pull}},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return g.Get(ctx, groupID)
|
||||
}
|
||||
@ -37,7 +37,7 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal
|
||||
b.CreateTime = time.Now()
|
||||
}
|
||||
}
|
||||
// 使用 upsert 避免重复插入报错
|
||||
// 使用 upsert 避免重复插入报错;status 也走 $set 以便升级/降级(冻结↔黑名单)时同步更新
|
||||
for _, b := range blacks {
|
||||
filter := bson.M{"user_id": b.UserID}
|
||||
update := bson.M{
|
||||
@ -45,6 +45,7 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal
|
||||
"nickname": b.Nickname,
|
||||
"operator_id": b.OperatorID,
|
||||
"reason": b.Reason,
|
||||
"status": b.Status,
|
||||
},
|
||||
"$setOnInsert": bson.M{
|
||||
"user_id": b.UserID,
|
||||
@ -59,6 +60,20 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStatus 返回 userID 对应的限制状态:
|
||||
// 0=正常(无记录),1=冻结,2=黑名单
|
||||
func (u *UserGlobalBlackMgo) GetStatus(ctx context.Context, userID string) (int32, error) {
|
||||
var doc model.UserGlobalBlack
|
||||
err := u.coll.FindOne(ctx, bson.M{"user_id": userID}, options.FindOne().SetProjection(bson.M{"status": 1})).Decode(&doc)
|
||||
if err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return model.UserStatusNormal, nil
|
||||
}
|
||||
return model.UserStatusNormal, errs.Wrap(err)
|
||||
}
|
||||
return doc.Status, nil
|
||||
}
|
||||
|
||||
func (u *UserGlobalBlackMgo) Remove(ctx context.Context, users []string) error {
|
||||
if len(users) == 0 {
|
||||
return nil
|
||||
|
||||
@ -12,6 +12,7 @@ const (
|
||||
GroupJoinVersionName = "group_join_version"
|
||||
ConversationVersionName = "conversation_version"
|
||||
GroupRequestName = "group_request"
|
||||
GroupPinnedMsgName = "group_pinned_msg"
|
||||
LogName = "log"
|
||||
ObjectName = "s3"
|
||||
UserName = "user"
|
||||
|
||||
@ -17,6 +17,8 @@ type UserGlobalBlack interface {
|
||||
Find(ctx context.Context, userIDs []string) ([]*model.UserGlobalBlack, error)
|
||||
// IsBlocked 检查单个用户是否在黑名单
|
||||
IsBlocked(ctx context.Context, userID string) (bool, error)
|
||||
// GetStatus 返回用户限制状态:0=正常,1=冻结,2=黑名单
|
||||
GetStatus(ctx context.Context, userID string) (int32, error)
|
||||
// Page 分页查询黑名单列表
|
||||
Page(ctx context.Context, pagination pagination.Pagination) (count int64, blacks []*model.UserGlobalBlack, err error)
|
||||
}
|
||||
|
||||
69
pkg/common/storage/model/group_pinned_msg.go
Normal file
69
pkg/common/storage/model/group_pinned_msg.go
Normal file
@ -0,0 +1,69 @@
|
||||
// Copyright © 2026 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
||||
package model
|
||||
|
||||
// GroupPinnedMsgMaxKeep 群置顶消息最多保留的条数(最新置顶的在最前)
|
||||
const GroupPinnedMsgMaxKeep = 3
|
||||
|
||||
// GroupPinnedOfflinePush 离线推送信息快照
|
||||
type GroupPinnedOfflinePush struct {
|
||||
Title string `bson:"title"`
|
||||
Desc string `bson:"desc"`
|
||||
Ex string `bson:"ex"`
|
||||
IOSPushSound string `bson:"ios_push_sound"`
|
||||
IOSBadgeCount bool `bson:"ios_badge_count"`
|
||||
SignalInfo string `bson:"signal_info"`
|
||||
}
|
||||
|
||||
// GroupPinnedMessage 一条群置顶消息的完整内容快照
|
||||
// 置顶时把消息整体快照入库,避免后续消息删除/撤回影响已置顶展示
|
||||
type GroupPinnedMessage struct {
|
||||
// PinID 全局唯一 id,用于精准取消置顶(生产由 mongo ObjectID().Hex() 生成)
|
||||
PinID string `bson:"pin_id"`
|
||||
|
||||
// 会话 / 群信息
|
||||
ConversationID string `bson:"conversation_id"`
|
||||
GroupID string `bson:"group_id"`
|
||||
|
||||
// 消息标识
|
||||
Seq int64 `bson:"seq"`
|
||||
ServerMsgID string `bson:"server_msg_id"`
|
||||
ClientMsgID string `bson:"client_msg_id"`
|
||||
|
||||
// 发送方信息
|
||||
SendID string `bson:"send_id"`
|
||||
RecvID string `bson:"recv_id"`
|
||||
SenderPlatformID int32 `bson:"sender_platform_id"`
|
||||
SenderNickname string `bson:"sender_nickname"`
|
||||
SenderFaceURL string `bson:"sender_face_url"`
|
||||
|
||||
// 消息内容快照
|
||||
SessionType int32 `bson:"session_type"`
|
||||
MsgFrom int32 `bson:"msg_from"`
|
||||
ContentType int32 `bson:"content_type"`
|
||||
Content string `bson:"content"`
|
||||
AtUserIDList []string `bson:"at_user_id_list"`
|
||||
Options map[string]bool `bson:"options"`
|
||||
AttachedInfo string `bson:"attached_info"`
|
||||
Ex string `bson:"ex"`
|
||||
|
||||
OfflinePush *GroupPinnedOfflinePush `bson:"offline_push"`
|
||||
|
||||
// 时间
|
||||
SendTime int64 `bson:"send_time"`
|
||||
CreateTime int64 `bson:"create_time"`
|
||||
Status int32 `bson:"status"`
|
||||
|
||||
// 操作人 & 时间
|
||||
PinUserID string `bson:"pin_user_id"`
|
||||
PinTime int64 `bson:"pin_time"`
|
||||
}
|
||||
|
||||
// GroupPinnedMsg 一个群的置顶消息文档,按 group_id 唯一
|
||||
type GroupPinnedMsg struct {
|
||||
GroupID string `bson:"group_id"`
|
||||
PinnedMsgs []*GroupPinnedMessage `bson:"pinned_msgs"`
|
||||
}
|
||||
@ -42,6 +42,14 @@ const (
|
||||
MsgReceiveSettingNobody int32 = 2
|
||||
)
|
||||
|
||||
// UserStatus 用户账号状态枚举。
|
||||
// 0=正常;1=冻结(可登录,不能收发消息);2=黑名单(不可登录,自动踢下线,不能收发消息)
|
||||
const (
|
||||
UserStatusNormal int32 = 0
|
||||
UserStatusFrozen int32 = 1
|
||||
UserStatusBlacklist int32 = 2
|
||||
)
|
||||
|
||||
type User struct {
|
||||
UserID string `bson:"user_id"`
|
||||
Nickname string `bson:"nickname"`
|
||||
@ -60,6 +68,8 @@ type User struct {
|
||||
MsgReceiveSetting int32 `bson:"msg_receive_setting"`
|
||||
// CallRingtoneURL 用户自定义来电铃声 URL;对方来电时播放此铃声
|
||||
CallRingtoneURL string `bson:"call_ringtone_url"`
|
||||
// Status 账号状态:0=正常,1=冻结,2=黑名单
|
||||
Status int32 `bson:"status"`
|
||||
}
|
||||
|
||||
func (u *User) GetNickname() string {
|
||||
|
||||
@ -2,11 +2,14 @@ package model
|
||||
|
||||
import "time"
|
||||
|
||||
// UserGlobalBlack 全局黑名单记录,被加入黑名单的用户无法登录
|
||||
// UserGlobalBlack 全局黑名单/冻结记录。
|
||||
// Status: 1=冻结(可登录,不能收发消息);2=黑名单(不可登录,自动踢下线,不能收发消息)
|
||||
type UserGlobalBlack struct {
|
||||
UserID string `bson:"user_id"`
|
||||
Nickname string `bson:"nickname"`
|
||||
OperatorID string `bson:"operator_id"`
|
||||
Reason string `bson:"reason"`
|
||||
CreateTime time.Time `bson:"create_time"`
|
||||
// Status 限制类型:1=冻结,2=黑名单
|
||||
Status int32 `bson:"status"`
|
||||
}
|
||||
|
||||
@ -75,6 +75,29 @@ func (x *MsgClient) GetActiveConversation(ctx context.Context, conversationIDs [
|
||||
return extractField(ctx, x.MsgClient.GetActiveConversation, req, (*msg.GetActiveConversationResp).GetConversations)
|
||||
}
|
||||
|
||||
// GetSingleMsgBySeq 根据会话 ID 与 seq 拉取一条消息(不存在时返回 nil)
|
||||
func (x *MsgClient) GetSingleMsgBySeq(ctx context.Context, conversationID string, seq int64) (*sdkws.MsgData, error) {
|
||||
if conversationID == "" || seq <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
req := &msg.GetMsgByConversationIDsReq{
|
||||
ConversationIDs: []string{conversationID},
|
||||
MaxSeqs: map[string]int64{conversationID: seq},
|
||||
}
|
||||
resp, err := x.MsgClient.GetMsgByConversationIDs(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := resp.GetMsgDatas()
|
||||
if len(m) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if v, ok := m[conversationID]; ok && v != nil && v.Seq == seq {
|
||||
return v, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (x *MsgClient) GetSeqMessage(ctx context.Context, userID string, conversations []*msg.ConversationSeqs) (map[string]*sdkws.PullMsgs, error) {
|
||||
if len(conversations) == 0 {
|
||||
return nil, nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user