mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-25 19:22:46 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
3d46297aa4
@ -38,6 +38,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
|||||||
}
|
}
|
||||||
conversationDB := relation.NewConversationGorm(db)
|
conversationDB := relation.NewConversationGorm(db)
|
||||||
pbConversation.RegisterConversationServer(server, &conversationServer{
|
pbConversation.RegisterConversationServer(server, &conversationServer{
|
||||||
|
notify: notification.NewCheck(client),
|
||||||
groupChecker: check.NewGroupChecker(client),
|
groupChecker: check.NewGroupChecker(client),
|
||||||
ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
|
ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
|
||||||
})
|
})
|
||||||
|
@ -49,6 +49,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
|||||||
pbGroup.RegisterGroupServer(server, &groupServer{
|
pbGroup.RegisterGroupServer(server, &groupServer{
|
||||||
GroupDatabase: controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()),
|
GroupDatabase: controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()),
|
||||||
UserCheck: check.NewUserCheck(client),
|
UserCheck: check.NewUserCheck(client),
|
||||||
|
Notification: notification.NewCheck(client),
|
||||||
ConversationChecker: check.NewConversationChecker(client),
|
ConversationChecker: check.NewConversationChecker(client),
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
@ -152,6 +153,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
|||||||
groupMember.JoinSource = constant.JoinByInvitation
|
groupMember.JoinSource = constant.JoinByInvitation
|
||||||
groupMember.InviterUserID = mcontext.GetOpUserID(ctx)
|
groupMember.InviterUserID = mcontext.GetOpUserID(ctx)
|
||||||
groupMember.JoinTime = time.Now()
|
groupMember.JoinTime = time.Now()
|
||||||
|
groupMember.MuteEndTime = time.Unix(0, 0)
|
||||||
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -313,6 +315,8 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
|
|||||||
member.OperatorUserID = opUserID
|
member.OperatorUserID = opUserID
|
||||||
member.InviterUserID = opUserID
|
member.InviterUserID = opUserID
|
||||||
member.JoinSource = constant.JoinByInvitation
|
member.JoinSource = constant.JoinByInvitation
|
||||||
|
member.JoinTime = time.Now()
|
||||||
|
member.MuteEndTime = time.Unix(0, 0)
|
||||||
if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -640,6 +644,8 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
|
|||||||
groupMember.OperatorUserID = mcontext.GetOpUserID(ctx)
|
groupMember.OperatorUserID = mcontext.GetOpUserID(ctx)
|
||||||
groupMember.JoinSource = constant.JoinByInvitation
|
groupMember.JoinSource = constant.JoinByInvitation
|
||||||
groupMember.InviterUserID = mcontext.GetOpUserID(ctx)
|
groupMember.InviterUserID = mcontext.GetOpUserID(ctx)
|
||||||
|
groupMember.JoinTime = time.Now()
|
||||||
|
groupMember.MuteEndTime = time.Unix(0, 0)
|
||||||
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,9 @@ type ApiResponse struct {
|
|||||||
|
|
||||||
func isAllFieldsPrivate(v any) bool {
|
func isAllFieldsPrivate(v any) bool {
|
||||||
typeOf := reflect.TypeOf(v)
|
typeOf := reflect.TypeOf(v)
|
||||||
|
if typeOf == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
if typeOf.Kind() == reflect.Ptr {
|
if typeOf.Kind() == reflect.Ptr {
|
||||||
typeOf = typeOf.Elem()
|
typeOf = typeOf.Elem()
|
||||||
}
|
}
|
||||||
|
1
pkg/common/db/cache/black.go
vendored
1
pkg/common/db/cache/black.go
vendored
@ -46,6 +46,7 @@ func (b *BlackCacheRedis) NewCache() BlackCache {
|
|||||||
expireTime: b.expireTime,
|
expireTime: b.expireTime,
|
||||||
rcClient: b.rcClient,
|
rcClient: b.rcClient,
|
||||||
blackDB: b.blackDB,
|
blackDB: b.blackDB,
|
||||||
|
metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDeleteKeys()...),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
12
pkg/common/db/cache/conversation.go
vendored
12
pkg/common/db/cache/conversation.go
vendored
@ -68,7 +68,7 @@ func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) NewCache() ConversationCache {
|
func (c *ConversationRedisCache) NewCache() ConversationCache {
|
||||||
return &ConversationRedisCache{rcClient: c.rcClient, metaCache: c.metaCache, conversationDB: c.conversationDB, expireTime: c.expireTime}
|
return &ConversationRedisCache{rcClient: c.rcClient, metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDeleteKeys()...), conversationDB: c.conversationDB, expireTime: c.expireTime}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
|
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
|
||||||
@ -157,16 +157,6 @@ func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ow
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ConversationCache {
|
|
||||||
var keys []string
|
|
||||||
for _, conversationID := range conversationIDs {
|
|
||||||
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
|
|
||||||
}
|
|
||||||
cache := c.NewCache()
|
|
||||||
cache.AddKeys(keys...)
|
|
||||||
return cache
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
|
func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
|
||||||
return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
|
return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
|
||||||
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
|
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
|
||||||
|
2
pkg/common/db/cache/extend_msg_set.go
vendored
2
pkg/common/db/cache/extend_msg_set.go
vendored
@ -40,7 +40,7 @@ func NewExtendMsgSetCacheRedis(rdb redis.UniversalClient, extendMsgSetDB unrelat
|
|||||||
|
|
||||||
func (e *ExtendMsgSetCacheRedis) NewCache() ExtendMsgSetCache {
|
func (e *ExtendMsgSetCacheRedis) NewCache() ExtendMsgSetCache {
|
||||||
return &ExtendMsgSetCacheRedis{
|
return &ExtendMsgSetCacheRedis{
|
||||||
metaCache: e.metaCache,
|
metaCache: NewMetaCacheRedis(e.rcClient, e.metaCache.GetPreDeleteKeys()...),
|
||||||
expireTime: e.expireTime,
|
expireTime: e.expireTime,
|
||||||
extendMsgSetDB: e.extendMsgSetDB,
|
extendMsgSetDB: e.extendMsgSetDB,
|
||||||
rcClient: e.rcClient,
|
rcClient: e.rcClient,
|
||||||
|
2
pkg/common/db/cache/friend.go
vendored
2
pkg/common/db/cache/friend.go
vendored
@ -48,7 +48,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationTb.FriendMo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *FriendCacheRedis) NewCache() FriendCache {
|
func (c *FriendCacheRedis) NewCache() FriendCache {
|
||||||
return &FriendCacheRedis{rcClient: c.rcClient, metaCache: c.metaCache, friendDB: c.friendDB, expireTime: c.expireTime}
|
return &FriendCacheRedis{rcClient: c.rcClient, metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDeleteKeys()...), friendDB: c.friendDB, expireTime: c.expireTime}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
|
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
|
||||||
|
2
pkg/common/db/cache/group.go
vendored
2
pkg/common/db/cache/group.go
vendored
@ -70,7 +70,7 @@ func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModel
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCacheRedis) NewCache() GroupCache {
|
func (g *GroupCacheRedis) NewCache() GroupCache {
|
||||||
return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: g.metaCache}
|
return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDeleteKeys()...)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
|
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
|
||||||
|
11
pkg/common/db/cache/rockscache.go
vendored
11
pkg/common/db/cache/rockscache.go
vendored
@ -4,8 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
"github.com/dtm-labs/rockscache"
|
"github.com/dtm-labs/rockscache"
|
||||||
)
|
)
|
||||||
@ -22,8 +24,8 @@ type metaCache interface {
|
|||||||
GetPreDeleteKeys() []string
|
GetPreDeleteKeys() []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetaCacheRedis(rcClient *rockscache.Client) metaCache {
|
func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
|
||||||
return &metaCacheRedis{rcClient: rcClient}
|
return &metaCacheRedis{rcClient: rcClient, keys: keys}
|
||||||
}
|
}
|
||||||
|
|
||||||
type metaCacheRedis struct {
|
type metaCacheRedis struct {
|
||||||
@ -33,6 +35,7 @@ type metaCacheRedis struct {
|
|||||||
|
|
||||||
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
|
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
|
||||||
if len(m.keys) > 0 {
|
if len(m.keys) > 0 {
|
||||||
|
log.ZDebug(ctx, "DelKey", "keys", m.keys)
|
||||||
return m.rcClient.TagAsDeletedBatch2(ctx, m.keys)
|
return m.rcClient.TagAsDeletedBatch2(ctx, m.keys)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -86,10 +89,9 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
|
func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
|
||||||
var tArrays []T
|
|
||||||
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
|
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
|
||||||
values := make(map[int]string)
|
values := make(map[int]string)
|
||||||
tArrays, err = fn(ctx)
|
tArrays, err := fn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -109,6 +111,7 @@ func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
var tArrays []T
|
||||||
for _, v := range batchMap {
|
for _, v := range batchMap {
|
||||||
if v != "" {
|
if v != "" {
|
||||||
var t T
|
var t T
|
||||||
|
2
pkg/common/db/cache/user.go
vendored
2
pkg/common/db/cache/user.go
vendored
@ -44,7 +44,7 @@ func NewUserCacheRedis(rdb redis.UniversalClient, userDB relationTb.UserModelInt
|
|||||||
|
|
||||||
func (u *UserCacheRedis) NewCache() UserCache {
|
func (u *UserCacheRedis) NewCache() UserCache {
|
||||||
return &UserCacheRedis{
|
return &UserCacheRedis{
|
||||||
metaCache: u.metaCache,
|
metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDeleteKeys()...),
|
||||||
userDB: u.userDB,
|
userDB: u.userDB,
|
||||||
expireTime: u.expireTime,
|
expireTime: u.expireTime,
|
||||||
rcClient: u.rcClient,
|
rcClient: u.rcClient,
|
||||||
|
@ -75,8 +75,10 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cache = cache.DelConversationIDs(NotUserIDs)
|
cache = cache.DelConversationIDs(NotUserIDs)
|
||||||
|
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDeleteKeys(), "addr", &cache)
|
||||||
}
|
}
|
||||||
// clear cache
|
// clear cache
|
||||||
|
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDeleteKeys(), "addr", &cache)
|
||||||
return cache.DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx)
|
return cache.DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user