mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	online cache
This commit is contained in:
		
							parent
							
								
									3ef62d4aa5
								
							
						
					
					
						commit
						3167780620
					
				@ -45,6 +45,7 @@ type ConsumerHandler struct {
 | 
				
			|||||||
	pushConsumerGroup      *kafka.MConsumerGroup
 | 
						pushConsumerGroup      *kafka.MConsumerGroup
 | 
				
			||||||
	offlinePusher          offlinepush.OfflinePusher
 | 
						offlinePusher          offlinepush.OfflinePusher
 | 
				
			||||||
	onlinePusher           OnlinePusher
 | 
						onlinePusher           OnlinePusher
 | 
				
			||||||
 | 
						onlineCache            *rpccache.OnlineCache
 | 
				
			||||||
	groupLocalCache        *rpccache.GroupLocalCache
 | 
						groupLocalCache        *rpccache.GroupLocalCache
 | 
				
			||||||
	conversationLocalCache *rpccache.ConversationLocalCache
 | 
						conversationLocalCache *rpccache.ConversationLocalCache
 | 
				
			||||||
	msgRpcClient           rpcclient.MessageRpcClient
 | 
						msgRpcClient           rpcclient.MessageRpcClient
 | 
				
			||||||
@ -63,16 +64,17 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
 | 
				
			||||||
	consumerHandler.offlinePusher = offlinePusher
 | 
						consumerHandler.offlinePusher = offlinePusher
 | 
				
			||||||
	consumerHandler.onlinePusher = NewOnlinePusher(client, config)
 | 
						consumerHandler.onlinePusher = NewOnlinePusher(client, config)
 | 
				
			||||||
	consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
 | 
						consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
 | 
				
			||||||
	consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb)
 | 
						consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb)
 | 
				
			||||||
	consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
 | 
						consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
 | 
				
			||||||
	consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
 | 
						consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
 | 
				
			||||||
	consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient,
 | 
						consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
 | 
				
			||||||
		&config.LocalCacheConfig, rdb)
 | 
					 | 
				
			||||||
	consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
 | 
						consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
 | 
				
			||||||
	consumerHandler.config = config
 | 
						consumerHandler.config = config
 | 
				
			||||||
 | 
						consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb)
 | 
				
			||||||
	return &consumerHandler, nil
 | 
						return &consumerHandler, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -100,11 +102,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
 | 
				
			|||||||
		var pushUserIDList []string
 | 
							var pushUserIDList []string
 | 
				
			||||||
		isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
 | 
							isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
 | 
				
			||||||
		if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
 | 
							if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
 | 
				
			||||||
			pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
 | 
								pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID})
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
 | 
								pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID})
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err == nil {
 | 
				
			||||||
 | 
								err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.ZWarn(ctx, "push failed", err, "msg", pbData.String())
 | 
							log.ZWarn(ctx, "push failed", err, "msg", pbData.String())
 | 
				
			||||||
@ -233,7 +237,8 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
 | 
					func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
 | 
				
			||||||
	if len(*pushToUserIDs) == 0 {
 | 
						if len(*pushToUserIDs) == 0 {
 | 
				
			||||||
		*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
 | 
							//*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
 | 
				
			||||||
 | 
							*pushToUserIDs, err = c.onlineCache.GetGroupOnline(ctx, groupID) //
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
				
			|||||||
@ -3,6 +3,7 @@ package redis
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
						"github.com/redis/go-redis/v9"
 | 
				
			||||||
	"log"
 | 
						"log"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
@ -15,7 +16,7 @@ func newTestOnline() *userOnline {
 | 
				
			|||||||
	opt := &redis.Options{
 | 
						opt := &redis.Options{
 | 
				
			||||||
		Addr:     "172.16.8.48:16379",
 | 
							Addr:     "172.16.8.48:16379",
 | 
				
			||||||
		Password: "openIM123",
 | 
							Password: "openIM123",
 | 
				
			||||||
		DB:       1,
 | 
							DB:       0,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	rdb := redis.NewClient(opt)
 | 
						rdb := redis.NewClient(opt)
 | 
				
			||||||
	if err := rdb.Ping(context.Background()).Err(); err != nil {
 | 
						if err := rdb.Ping(context.Background()).Err(); err != nil {
 | 
				
			||||||
@ -63,7 +64,7 @@ func TestGetOnline(t *testing.T) {
 | 
				
			|||||||
func TestRecvOnline(t *testing.T) {
 | 
					func TestRecvOnline(t *testing.T) {
 | 
				
			||||||
	ts := newTestOnline()
 | 
						ts := newTestOnline()
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
	pubsub := ts.rdb.Subscribe(ctx, "user_online")
 | 
						pubsub := ts.rdb.Subscribe(ctx, cachekey.OnlineChannel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 等待订阅确认
 | 
						// 等待订阅确认
 | 
				
			||||||
	_, err := pubsub.Receive(ctx)
 | 
						_, err := pubsub.Receive(ctx)
 | 
				
			||||||
 | 
				
			|||||||
@ -31,6 +31,12 @@ type Cache[V any] interface {
 | 
				
			|||||||
	Stop()
 | 
						Stop()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func LRUStringHash(key string) uint64 {
 | 
				
			||||||
 | 
						h := fnv.New64a()
 | 
				
			||||||
 | 
						h.Write(*(*[]byte)(unsafe.Pointer(&key)))
 | 
				
			||||||
 | 
						return h.Sum64()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func New[V any](opts ...Option) Cache[V] {
 | 
					func New[V any](opts ...Option) Cache[V] {
 | 
				
			||||||
	opt := defaultOption()
 | 
						opt := defaultOption()
 | 
				
			||||||
	for _, o := range opts {
 | 
						for _, o := range opts {
 | 
				
			||||||
@ -49,11 +55,7 @@ func New[V any](opts ...Option) Cache[V] {
 | 
				
			|||||||
		if opt.localSlotNum == 1 {
 | 
							if opt.localSlotNum == 1 {
 | 
				
			||||||
			c.local = createSimpleLRU()
 | 
								c.local = createSimpleLRU()
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, func(key string) uint64 {
 | 
								c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU)
 | 
				
			||||||
				h := fnv.New64a()
 | 
					 | 
				
			||||||
				h.Write(*(*[]byte)(unsafe.Pointer(&key)))
 | 
					 | 
				
			||||||
				return h.Sum64()
 | 
					 | 
				
			||||||
			}, createSimpleLRU)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if opt.linkSlotNum > 0 {
 | 
							if opt.linkSlotNum > 0 {
 | 
				
			||||||
			c.link = link.New(opt.linkSlotNum)
 | 
								c.link = link.New(opt.linkSlotNum)
 | 
				
			||||||
 | 
				
			|||||||
@ -20,6 +20,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V]
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type LRU[K comparable, V any] interface {
 | 
					type LRU[K comparable, V any] interface {
 | 
				
			||||||
	Get(key K, fetch func() (V, error)) (V, error)
 | 
						Get(key K, fetch func() (V, error)) (V, error)
 | 
				
			||||||
 | 
						SetHas(key K, value V) bool
 | 
				
			||||||
	Del(key K) bool
 | 
						Del(key K) bool
 | 
				
			||||||
	Stop()
 | 
						Stop()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -89,5 +89,15 @@ func (x *ExpirationLRU[K, V]) Del(key K) bool {
 | 
				
			|||||||
	return ok
 | 
						return ok
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *ExpirationLRU[K, V]) SetHas(key K, value V) bool {
 | 
				
			||||||
 | 
						x.lock.Lock()
 | 
				
			||||||
 | 
						defer x.lock.Unlock()
 | 
				
			||||||
 | 
						if x.core.Contains(key) {
 | 
				
			||||||
 | 
							x.core.Add(key, &expirationLruItem[V]{value: value})
 | 
				
			||||||
 | 
							return true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return false
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (x *ExpirationLRU[K, V]) Stop() {
 | 
					func (x *ExpirationLRU[K, V]) Stop() {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -88,6 +88,28 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
 | 
				
			|||||||
	return v.value, v.err
 | 
						return v.value, v.err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//func (x *LayLRU[K, V]) Set(key K, value V) {
 | 
				
			||||||
 | 
					//	x.lock.Lock()
 | 
				
			||||||
 | 
					//	x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
 | 
				
			||||||
 | 
					//	x.lock.Unlock()
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//func (x *LayLRU[K, V]) Has(key K) bool {
 | 
				
			||||||
 | 
					//	x.lock.Lock()
 | 
				
			||||||
 | 
					//	defer x.lock.Unlock()
 | 
				
			||||||
 | 
					//	return x.core.Contains(key)
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *LayLRU[K, V]) SetHas(key K, value V) bool {
 | 
				
			||||||
 | 
						x.lock.Lock()
 | 
				
			||||||
 | 
						defer x.lock.Unlock()
 | 
				
			||||||
 | 
						if x.core.Contains(key) {
 | 
				
			||||||
 | 
							x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
 | 
				
			||||||
 | 
							return true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return false
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (x *LayLRU[K, V]) Del(key K) bool {
 | 
					func (x *LayLRU[K, V]) Del(key K) bool {
 | 
				
			||||||
	x.lock.Lock()
 | 
						x.lock.Lock()
 | 
				
			||||||
	ok := x.core.Remove(key)
 | 
						ok := x.core.Remove(key)
 | 
				
			||||||
 | 
				
			|||||||
@ -40,6 +40,10 @@ func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
 | 
				
			|||||||
	return x.slots[x.getIndex(key)].Get(key, fetch)
 | 
						return x.slots[x.getIndex(key)].Get(key, fetch)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (x *slotLRU[K, V]) SetHas(key K, value V) bool {
 | 
				
			||||||
 | 
						return x.slots[x.getIndex(key)].SetHas(key, value)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (x *slotLRU[K, V]) Del(key K) bool {
 | 
					func (x *slotLRU[K, V]) Del(key K) bool {
 | 
				
			||||||
	return x.slots[x.getIndex(key)].Del(key)
 | 
						return x.slots[x.getIndex(key)].Del(key)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -30,7 +30,7 @@ func defaultOption() *option {
 | 
				
			|||||||
		localSuccessTTL: time.Minute,
 | 
							localSuccessTTL: time.Minute,
 | 
				
			||||||
		localFailedTTL:  time.Second * 5,
 | 
							localFailedTTL:  time.Second * 5,
 | 
				
			||||||
		delFn:           make([]func(ctx context.Context, key ...string), 0, 2),
 | 
							delFn:           make([]func(ctx context.Context, key ...string), 0, 2),
 | 
				
			||||||
		target:          emptyTarget{},
 | 
							target:          EmptyTarget{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -123,14 +123,14 @@ func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type emptyTarget struct{}
 | 
					type EmptyTarget struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (e emptyTarget) IncrGetHit() {}
 | 
					func (e EmptyTarget) IncrGetHit() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (e emptyTarget) IncrGetSuccess() {}
 | 
					func (e EmptyTarget) IncrGetSuccess() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (e emptyTarget) IncrGetFailed() {}
 | 
					func (e EmptyTarget) IncrGetFailed() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (e emptyTarget) IncrDelHit() {}
 | 
					func (e EmptyTarget) IncrDelHit() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (e emptyTarget) IncrDelNotFound() {}
 | 
					func (e EmptyTarget) IncrDelNotFound() {}
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										121
									
								
								pkg/rpccache/online.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										121
									
								
								pkg/rpccache/online.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,121 @@
 | 
				
			|||||||
 | 
					package rpccache
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/localcache"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/log"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/mcontext"
 | 
				
			||||||
 | 
						"github.com/redis/go-redis/v9"
 | 
				
			||||||
 | 
						"math/rand"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient) *OnlineCache {
 | 
				
			||||||
 | 
						x := &OnlineCache{
 | 
				
			||||||
 | 
							user:  user,
 | 
				
			||||||
 | 
							group: group,
 | 
				
			||||||
 | 
							local: lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
 | 
				
			||||||
 | 
								return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
 | 
				
			||||||
 | 
							}),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							parseUserOnlineStatus := func(payload string) (string, []int32, error) {
 | 
				
			||||||
 | 
								arr := strings.Split(payload, ":")
 | 
				
			||||||
 | 
								if len(arr) == 0 {
 | 
				
			||||||
 | 
									return "", nil, errors.New("invalid data")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								userID := arr[len(arr)-1]
 | 
				
			||||||
 | 
								if userID == "" {
 | 
				
			||||||
 | 
									return "", nil, errors.New("userID is empty")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								platformIDs := make([]int32, len(arr)-1)
 | 
				
			||||||
 | 
								for i := range platformIDs {
 | 
				
			||||||
 | 
									platformID, err := strconv.Atoi(arr[i])
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										return "", nil, err
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									platformIDs[i] = int32(platformID)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return userID, platformIDs, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10))
 | 
				
			||||||
 | 
							for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
 | 
				
			||||||
 | 
								userID, platformIDs, err := parseUserOnlineStatus(message.Payload)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									log.ZError(ctx, "redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								x.setUserOnline(userID, platformIDs)
 | 
				
			||||||
 | 
								//if err := x.setUserOnline(ctx, userID, platformIDs); err != nil {
 | 
				
			||||||
 | 
								//	log.ZError(ctx, "redis subscribe setUserOnline", err, "payload", message.Payload, "channel", message.Channel)
 | 
				
			||||||
 | 
								//}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
						return x
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type OnlineCache struct {
 | 
				
			||||||
 | 
						user  rpcclient.UserRpcClient
 | 
				
			||||||
 | 
						group *GroupLocalCache
 | 
				
			||||||
 | 
						local lru.LRU[string, []int32]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (o *OnlineCache) getUserOnlineKey(userID string) string {
 | 
				
			||||||
 | 
						return "<u>" + userID
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
 | 
				
			||||||
 | 
						return o.local.Get(userID, func() ([]int32, error) {
 | 
				
			||||||
 | 
							return o.user.GetUserOnlinePlatform(ctx, userID)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
 | 
				
			||||||
 | 
						platformIDs, err := o.GetUserOnlinePlatform(ctx, userID)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return len(platformIDs) > 0, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
 | 
				
			||||||
 | 
						onlineUserIDs := make([]string, 0, len(userIDs))
 | 
				
			||||||
 | 
						for _, userID := range userIDs {
 | 
				
			||||||
 | 
							online, err := o.GetUserOnline(ctx, userID)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if online {
 | 
				
			||||||
 | 
								onlineUserIDs = append(onlineUserIDs, userID)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return onlineUserIDs, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
 | 
				
			||||||
 | 
						userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var onlineUserIDs []string
 | 
				
			||||||
 | 
						for _, userID := range userIDs {
 | 
				
			||||||
 | 
							online, err := o.GetUserOnline(ctx, userID)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if online {
 | 
				
			||||||
 | 
								onlineUserIDs = append(onlineUserIDs, userID)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return onlineUserIDs, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) {
 | 
				
			||||||
 | 
						o.local.SetHas(o.getUserOnlineKey(userID), platformIDs)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -110,3 +110,18 @@ func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string)
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return users, nil
 | 
						return users, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//func (u *UserLocalCache) GetUserOnlinePlatform(ctx context.Context, userID string) (val []int32, err error) {
 | 
				
			||||||
 | 
					//	log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform req", "userID", userID)
 | 
				
			||||||
 | 
					//	defer func() {
 | 
				
			||||||
 | 
					//		if err == nil {
 | 
				
			||||||
 | 
					//			log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform return", "value", val)
 | 
				
			||||||
 | 
					//		} else {
 | 
				
			||||||
 | 
					//			log.ZError(ctx, "UserLocalCache GetUserOnlinePlatform return", err)
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//	}()
 | 
				
			||||||
 | 
					//	return localcache.AnyValue[[]int32](u.local.Get(ctx, cachekey.GetOnlineKey(userID), func(ctx context.Context) (any, error) {
 | 
				
			||||||
 | 
					//		log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID)
 | 
				
			||||||
 | 
					//		return u.client.GetUserGlobalMsgRecvOpt(ctx, userID)
 | 
				
			||||||
 | 
					//	}))
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
				
			|||||||
@ -193,3 +193,25 @@ func (u *UserRpcClient) GetNotificationByID(ctx context.Context, userID string)
 | 
				
			|||||||
	})
 | 
						})
 | 
				
			||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (u *UserRpcClient) GetUsersOnlinePlatform(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
 | 
				
			||||||
 | 
						if len(userIDs) == 0 {
 | 
				
			||||||
 | 
							return nil, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						resp, err := u.Client.GetUserStatus(ctx, &user.GetUserStatusReq{UserIDs: userIDs})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return resp.StatusList, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (u *UserRpcClient) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
 | 
				
			||||||
 | 
						resp, err := u.GetUsersOnlinePlatform(ctx, []string{userID})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(resp) == 0 {
 | 
				
			||||||
 | 
							return nil, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return resp[0].PlatformIDs, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user