second reset seq

This commit is contained in:
withchao 2023-06-19 10:22:25 +08:00
parent e0fd13bf4f
commit 698515549b
2 changed files with 28 additions and 5 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui"
@ -32,6 +31,8 @@ type Pusher struct {
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher
groupLocalCache *localcache.GroupLocalCache groupLocalCache *localcache.GroupLocalCache
conversationLocalCache *localcache.ConversationLocalCache conversationLocalCache *localcache.ConversationLocalCache
msgClient *rpcclient.MsgClient
conversationClient *rpcclient.ConversationClient
successCount int successCount int
} }
@ -46,6 +47,8 @@ func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl
offlinePusher: offlinePusher, offlinePusher: offlinePusher,
groupLocalCache: groupLocalCache, groupLocalCache: groupLocalCache,
conversationLocalCache: conversationLocalCache, conversationLocalCache: conversationLocalCache,
msgClient: rpcclient.NewMsgClient(client),
conversationClient: rpcclient.NewConversationClient(client),
} }
} }
@ -75,6 +78,15 @@ func (p *Pusher) DismissGroup(ctx context.Context, groupID string) error {
return err return err
} }
func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
maxSeq, err := p.msgClient.GetConversationMaxSeq(ctx, conevrsationID)
if err != nil {
return err
}
return p.conversationClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq)
}
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
// callback // callback
@ -145,6 +157,11 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err return err
} }
defer func(groupID string, userIDs []string) {
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
}
}(groupID, []string{tips.QuitUser.UserID})
pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID) pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID)
case constant.MemberKickedNotification: case constant.MemberKickedNotification:
var tips sdkws.MemberKickedTips var tips sdkws.MemberKickedTips
@ -152,6 +169,11 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
return err return err
} }
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
defer func(groupID string, userIDs []string) {
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
}
}(groupID, kickedUsers)
pushToUserIDs = append(pushToUserIDs, kickedUsers...) pushToUserIDs = append(pushToUserIDs, kickedUsers...)
case constant.GroupDismissedNotification: case constant.GroupDismissedNotification:
if utils.IsNotification(utils.GetConversationIDByMsg(msg)) { // 消息先到,通知后到 if utils.IsNotification(utils.GetConversationIDByMsg(msg)) { // 消息先到,通知后到
@ -163,9 +185,11 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if len(config.Config.Manager.AppManagerUid) > 0 { if len(config.Config.Manager.AppManagerUid) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.AppManagerUid[0]) ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.AppManagerUid[0])
} }
if err := p.DismissGroup(ctx, groupID); err != nil { defer func(groupID string) {
log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) if err := p.DismissGroup(ctx, groupID); err != nil {
} log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID)
}
}(groupID)
} }
} }
} }

View File

@ -806,7 +806,6 @@ func (s *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro
conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
maxSeq, err := s.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) maxSeq, err := s.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID)
if err != nil { if err != nil {
return err return err
} }
return s.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) return s.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq)