mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	Merge remote-tracking branch 'origin/main' into list
# Conflicts: # internal/rpc/group/notification.go
This commit is contained in:
		
						commit
						c73f176b97
					
				@ -13,6 +13,9 @@ afterUpdateUserInfoEx:
 | 
			
		||||
afterSendSingleMsg:
 | 
			
		||||
  enable: false
 | 
			
		||||
  timeout: 5
 | 
			
		||||
  # Only the senID/recvID specified in attentionIds will send the callback
 | 
			
		||||
  # if not set, all user messages will be callback
 | 
			
		||||
  attentionIds: []
 | 
			
		||||
beforeSendGroupMsg:
 | 
			
		||||
  enable: false
 | 
			
		||||
  timeout: 5
 | 
			
		||||
 | 
			
		||||
@ -101,6 +101,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
 | 
			
		||||
			SendTime:         params.SendTime,
 | 
			
		||||
			Options:          options,
 | 
			
		||||
			OfflinePushInfo:  params.OfflinePushInfo,
 | 
			
		||||
			Ex:               params.Ex,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	return &pbData
 | 
			
		||||
 | 
			
		||||
@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
 | 
			
		||||
		userRpcClient:  &userRpcClient,
 | 
			
		||||
		RegisterCenter: client,
 | 
			
		||||
		authDatabase: controller.NewAuthDatabase(
 | 
			
		||||
			redis2.NewTokenCacheModel(rdb),
 | 
			
		||||
			redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire),
 | 
			
		||||
			config.Share.Secret,
 | 
			
		||||
			config.RpcConfig.TokenPolicy.Expire,
 | 
			
		||||
		),
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,9 @@ import (
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
 | 
			
		||||
	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
	pbgroup "github.com/openimsdk/protocol/group"
 | 
			
		||||
	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
@ -36,6 +38,12 @@ import (
 | 
			
		||||
	"github.com/openimsdk/tools/utils/stringutil"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// GroupApplicationReceiver
 | 
			
		||||
const (
 | 
			
		||||
	applicantReceiver = iota
 | 
			
		||||
	adminReceiver
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
 | 
			
		||||
	return &GroupNotificationSender{
 | 
			
		||||
		NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
 | 
			
		||||
@ -418,15 +426,17 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg}
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
 | 
			
		||||
	var opUser *sdkws.GroupMemberFullInfo
 | 
			
		||||
	if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	for _, userID := range append(userIDs, req.FromUserID) {
 | 
			
		||||
		tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
 | 
			
		||||
		if userID == req.FromUserID {
 | 
			
		||||
			tips.ReceiverAs = 0
 | 
			
		||||
			tips.ReceiverAs = applicantReceiver
 | 
			
		||||
		} else {
 | 
			
		||||
			tips.ReceiverAs = 1
 | 
			
		||||
			tips.ReceiverAs = adminReceiver
 | 
			
		||||
		}
 | 
			
		||||
		g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
 | 
			
		||||
	}
 | 
			
		||||
@ -449,15 +459,17 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg}
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
 | 
			
		||||
	var opUser *sdkws.GroupMemberFullInfo
 | 
			
		||||
	if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	for _, userID := range append(userIDs, req.FromUserID) {
 | 
			
		||||
		tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
 | 
			
		||||
		if userID == req.FromUserID {
 | 
			
		||||
			tips.ReceiverAs = 0
 | 
			
		||||
			tips.ReceiverAs = applicantReceiver
 | 
			
		||||
		} else {
 | 
			
		||||
			tips.ReceiverAs = 1
 | 
			
		||||
			tips.ReceiverAs = adminReceiver
 | 
			
		||||
		}
 | 
			
		||||
		g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -83,6 +83,11 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config
 | 
			
		||||
	if msg.MsgData.ContentType == constant.Typing {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// According to the attentionIds configuration, only some users are sent
 | 
			
		||||
	attentionIds := after.AttentionIds
 | 
			
		||||
	if attentionIds != nil && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) && !datautil.Contain(msg.MsgData.SendID, attentionIds...) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
 | 
			
		||||
		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
 | 
			
		||||
		RecvID:            msg.MsgData.RecvID,
 | 
			
		||||
 | 
			
		||||
@ -55,6 +55,9 @@ type SendMsg struct {
 | 
			
		||||
 | 
			
		||||
	// OfflinePushInfo contains information for offline push notifications.
 | 
			
		||||
	OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
 | 
			
		||||
 | 
			
		||||
	// Ex stores extended fields
 | 
			
		||||
	Ex string `json:"ex"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat.
 | 
			
		||||
 | 
			
		||||
@ -341,6 +341,7 @@ type BeforeConfig struct {
 | 
			
		||||
type AfterConfig struct {
 | 
			
		||||
	Enable       bool  `mapstructure:"enable"`
 | 
			
		||||
	Timeout      int   `mapstructure:"timeout"`
 | 
			
		||||
	AttentionIds []string `mapstructure:"attentionIds"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Share struct {
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										28
									
								
								pkg/common/storage/cache/redis/token.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										28
									
								
								pkg/common/storage/cache/redis/token.go
									
									
									
									
										vendored
									
									
								
							@ -21,22 +21,36 @@ import (
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/stringutil"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type tokenCache struct {
 | 
			
		||||
	rdb          redis.UniversalClient
 | 
			
		||||
	accessExpire time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel {
 | 
			
		||||
	return &tokenCache{
 | 
			
		||||
		rdb: rdb,
 | 
			
		||||
	}
 | 
			
		||||
func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel {
 | 
			
		||||
	c := &tokenCache{rdb: rdb}
 | 
			
		||||
	c.accessExpire = c.getExpireTime(accessExpire)
 | 
			
		||||
	return c
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
 | 
			
		||||
func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
 | 
			
		||||
	return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetTokenFlagEx set token and flag with expire time
 | 
			
		||||
func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error {
 | 
			
		||||
	key := cachekey.GetTokenKey(userID, platformID)
 | 
			
		||||
	if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
 | 
			
		||||
	m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -61,3 +75,7 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla
 | 
			
		||||
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
 | 
			
		||||
	return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *tokenCache) getExpireTime(t int64) time.Duration {
 | 
			
		||||
	return time.Hour * 24 * time.Duration(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								pkg/common/storage/cache/token.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								pkg/common/storage/cache/token.go
									
									
									
									
										vendored
									
									
								
							@ -5,7 +5,9 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type TokenModel interface {
 | 
			
		||||
	AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
 | 
			
		||||
	SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
 | 
			
		||||
	// SetTokenFlagEx set token and flag with expire time
 | 
			
		||||
	SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error
 | 
			
		||||
	GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
 | 
			
		||||
	SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
 | 
			
		||||
	DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
 | 
			
		||||
 | 
			
		||||
@ -55,6 +55,7 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p
 | 
			
		||||
 | 
			
		||||
// Create Token.
 | 
			
		||||
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
 | 
			
		||||
	isCreate := true // flag is create or update
 | 
			
		||||
	tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
@ -65,6 +66,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
 | 
			
		||||
		if err != nil || v != constant.NormalToken {
 | 
			
		||||
			deleteTokenKey = append(deleteTokenKey, k)
 | 
			
		||||
		}
 | 
			
		||||
		if v == constant.NormalToken {
 | 
			
		||||
			isCreate = false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(deleteTokenKey) != 0 {
 | 
			
		||||
		err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
 | 
			
		||||
@ -79,5 +83,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", errs.WrapMsg(err, "token.SignedString")
 | 
			
		||||
	}
 | 
			
		||||
	return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
 | 
			
		||||
 | 
			
		||||
	if isCreate {
 | 
			
		||||
		// should create,should specify expiration time
 | 
			
		||||
		if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
 | 
			
		||||
			return "", err
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		// should update
 | 
			
		||||
		if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
 | 
			
		||||
			return "", err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return tokenString, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user