mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
conversationID
This commit is contained in:
parent
01f773c0f8
commit
e8e306eb98
@ -50,7 +50,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
|
||||
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
|
||||
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -58,16 +57,17 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
|
||||
if len(conversations) < 1 {
|
||||
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
|
||||
}
|
||||
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
|
||||
resp.Conversation = convert.ConversationDB2Pb(conversations[0])
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
|
||||
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
|
||||
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
|
||||
resp.Conversations = convert.ConversationsDB2Pb(conversations)
|
||||
return resp, nil
|
||||
}
|
||||
@ -297,7 +297,7 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbConv
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *pbConversation.GetConversationsHasReadAndMaxSeqReq) (*pbConversation.GetConversationsHasReadAndMaxSeqResp, error) {
|
||||
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.UserID)
|
||||
conversations, err := c.conversationDatabase.GetUserAllHasReadSeqs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -306,10 +306,10 @@ func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Contex
|
||||
return nil, err
|
||||
}
|
||||
resp := &pbConversation.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*pbConversation.Seqs)}
|
||||
for _, v := range conversations {
|
||||
resp.Seqs[v.ConversationID] = &pbConversation.Seqs{
|
||||
HasReadSeq: v.HasReadSeq,
|
||||
MaxSeq: maxSeqs.MaxSeqs[v.ConversationID],
|
||||
for conversationID, seq := range conversations {
|
||||
resp.Seqs[conversationID] = &pbConversation.Seqs{
|
||||
HasReadSeq: seq,
|
||||
MaxSeq: maxSeqs.MaxSeqs[conversationID],
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
|
@ -21,7 +21,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
|
||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
|
||||
return nil, err
|
||||
}
|
||||
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData)
|
||||
err = m.MsgDatabase.MsgToMQ(ctx, utils.GetConversationIDByMsg(req.MsgData), req.MsgData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
59
pkg/common/db/cache/conversation.go
vendored
59
pkg/common/db/cache/conversation.go
vendored
@ -18,6 +18,7 @@ const (
|
||||
conversationKey = "CONVERSATION:"
|
||||
conversationIDsKey = "CONVERSATION_IDS:"
|
||||
conversationIDsHashKey = "CONVERSATION_IDS_HASH:"
|
||||
conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
|
||||
recvMsgOptKey = "RECV_MSG_OPT:"
|
||||
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
|
||||
@ -53,6 +54,9 @@ type ConversationCache interface {
|
||||
// get one super group recv msg but do not notification userID list hash
|
||||
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
|
||||
DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
|
||||
|
||||
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
|
||||
DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache
|
||||
}
|
||||
|
||||
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationTb.ConversationModelInterface) ConversationCache {
|
||||
@ -96,6 +100,22 @@ func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupI
|
||||
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string {
|
||||
return conversationHasReadSeqKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) getAllConversationIDsKeys(ctx context.Context, ownerUserID string) ([]string, []string, error) {
|
||||
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var keys []string
|
||||
for _, conversarionID := range conversationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
|
||||
}
|
||||
return keys, conversationIDs, nil
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
|
||||
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
|
||||
@ -155,7 +175,7 @@ func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersa
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) {
|
||||
func (c *ConversationRedisCache) getConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) {
|
||||
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
|
||||
for _i, _key := range keys {
|
||||
if _key == key {
|
||||
@ -170,21 +190,17 @@ func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUser
|
||||
for _, conversarionID := range conversationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
|
||||
}
|
||||
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
|
||||
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
|
||||
keys, _, err := c.getAllConversationIDsKeys(ctx, ownerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var keys []string
|
||||
for _, conversarionID := range conversationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
|
||||
}
|
||||
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
|
||||
})
|
||||
}
|
||||
@ -241,3 +257,30 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupI
|
||||
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID))
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) {
|
||||
for _i, _conversationID := range conversationIDs {
|
||||
if _conversationID == conversationID {
|
||||
return _i, nil
|
||||
}
|
||||
}
|
||||
return 0, errors.New("not found key:" + conversationID + " in keys")
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
|
||||
keys, conversationIDs, err := c.getAllConversationIDsKeys(ctx, ownerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) {
|
||||
return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache {
|
||||
cache := c.NewCache()
|
||||
for _, conversationID := range conversationIDs {
|
||||
cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID))
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
10
pkg/common/db/cache/meta_cache.go
vendored
10
pkg/common/db/cache/meta_cache.go
vendored
@ -153,15 +153,15 @@ func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys
|
||||
return tArrays, nil
|
||||
}
|
||||
|
||||
func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, originKeys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) {
|
||||
func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys, originKeys []string, expire time.Duration, keyIndexFn func(s string, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) {
|
||||
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
|
||||
values := make(map[int]string)
|
||||
tArrays, err := fn(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, v := range tArrays {
|
||||
index, err := keyIndexFn(v, keys)
|
||||
values := make(map[int]string)
|
||||
for k, v := range tArrays {
|
||||
index, err := keyIndexFn(k, originKeys)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -184,7 +184,7 @@ func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, k
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "unmarshal failed")
|
||||
}
|
||||
tMap[keys[i]] = t
|
||||
tMap[originKeys[i]] = t
|
||||
}
|
||||
}
|
||||
return tMap, nil
|
||||
|
4
pkg/common/db/cache/msg.go
vendored
4
pkg/common/db/cache/msg.go
vendored
@ -251,14 +251,14 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string,
|
||||
failedSeqs = append(failedSeqs, seqs[i])
|
||||
} else {
|
||||
msg := sdkws.MsgData{}
|
||||
err = jsonpb.UnmarshalString(cmd.Val(), &msg)
|
||||
err = utils.String2Pb(cmd.Val(), &msg)
|
||||
if err == nil {
|
||||
if msg.Status != constant.MsgDeleted {
|
||||
seqMsgs = append(seqMsgs, &msg)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i])
|
||||
log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val())
|
||||
}
|
||||
failedSeqs = append(failedSeqs, seqs[i])
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ type ConversationDatabase interface {
|
||||
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
|
||||
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
||||
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
|
||||
}
|
||||
|
||||
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||
@ -250,3 +251,7 @@ func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, o
|
||||
func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) {
|
||||
return c.conversationDB.GetAllConversationIDs(ctx)
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
|
||||
return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID)
|
||||
}
|
||||
|
@ -80,3 +80,13 @@ func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, c
|
||||
func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) {
|
||||
return conversationIDs, utils.Wrap(c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hasReadSeqs map[string]int64, err error) {
|
||||
var conversations []*relation.ConversationModel
|
||||
err = utils.Wrap(c.db(ctx).Where("owner_user_id = ?", ownerUserID).Select("conversation_id", "has_read_seq").Find(&conversations).Error, "")
|
||||
hasReadSeqs = make(map[string]int64, len(conversations))
|
||||
for _, conversation := range conversations {
|
||||
hasReadSeqs[conversation.ConversationID] = conversation.HasReadSeq
|
||||
}
|
||||
return hasReadSeqs, err
|
||||
}
|
||||
|
@ -45,5 +45,6 @@ type ConversationModelInterface interface {
|
||||
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
|
||||
FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
|
||||
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
||||
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error)
|
||||
NewTx(tx any) ConversationModelInterface
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user