Compare commits

...

3 Commits

Author SHA1 Message Date
xuzhijvn
0e6ccd49dd
Merge 8cd57d68535f83c6495423e777f6a30c88cbe0f1 into 6f33c0a51518e14e6be37e3ff420f73b49e58d18 2025-12-23 18:30:19 +08:00
ribin2333
6f33c0a515
fix: reset user conversation seq when rejoining group to resolve message recall issue (#3640)
* fix: reset user conversation seq when rejoining group to resolve message recall issue

* fix: refactor setMemberJoinSeq based on review feedback

* group: 入群个人上限重置为不受限值;退出个人上限固化;通知控制入群 minSeq
2025-12-23 06:37:32 +00:00
xuzhijvn
8cd57d6853 fix: the loop querying the db in func batchGetCache2 (#3301) 2025-04-24 15:32:41 +08:00
2 changed files with 39 additions and 7 deletions

View File

@ -17,6 +17,7 @@ package group
import (
"context"
"fmt"
"math"
"math/big"
"math/rand"
"strconv"
@ -472,6 +473,9 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, userIDs...)
}
}
if err := g.setMemberJoinSeq(ctx, req.GroupID, req.InvitedUserIDs); err != nil {
return nil, err
}
return &pbgroup.InviteUserToGroupResp{}, nil
}
@ -905,6 +909,9 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
return nil, err
}
}
if err := g.setMemberJoinSeq(ctx, req.GroupID, []string{req.FromUserID}); err != nil {
return nil, err
}
}
case constant.GroupResponseRefuse:
g.notification.GroupApplicationRejectedNotification(ctx, req)
@ -967,6 +974,9 @@ func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID); err != nil {
return nil, err
}
if err := g.setMemberJoinSeq(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
return nil, err
}
g.webhookAfterJoinGroup(ctx, &g.config.WebhooksConfig.AfterJoinGroup, req)
return &pbgroup.JoinGroupResp{}, nil
@ -1028,6 +1038,11 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq)
}
func (g *groupServer) setMemberJoinSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, math.MaxInt64)
}
func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) {
var opMember *model.GroupMember
if !authverify.IsAdmin(ctx) {

View File

@ -76,6 +76,27 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCac
if err != nil {
return nil, err
}
values, err := fn(ctx, ids)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "ids", ids)
return nil, err
}
idToValue := make(map[K]*V)
for _, value := range values {
idToValue[vId(value)] = value
}
getSlotValues := func(slotIds []K) []*V {
if len(slotIds) == 0 {
return nil
}
slotValues := make([]*V, 0, len(slotIds))
for _, id := range slotIds {
if value, ok := idToValue[id]; ok {
slotValues = append(slotValues, value)
}
}
return slotValues
}
result := make([]*V, 0, len(findKeys))
for _, keys := range slotKeys {
indexCache, err := rcClient.GetClient().FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
@ -86,16 +107,12 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCac
idIndex[id] = index
queryIds = append(queryIds, id)
}
values, err := fn(ctx, queryIds)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "keys", keys, "queryIds", queryIds)
return nil, err
}
if len(values) == 0 {
slotValues := getSlotValues(queryIds)
if len(slotValues) == 0 {
return map[int]string{}, nil
}
cacheIndex := make(map[int]string)
for _, value := range values {
for _, value := range slotValues {
id := vId(value)
index, ok := idIndex[id]
if !ok {