mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-27 14:02:15 +08:00
Merge branch 'openimsdk:main' into main
This commit is contained in:
commit
e2bf593336
2
go.mod
2
go.mod
@ -12,7 +12,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.47
|
github.com/openimsdk/protocol v0.0.69-alpha.50
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.55
|
github.com/openimsdk/tools v0.0.49-alpha.55
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.47 h1:WEpU7dHSzcpiyPoUkgSt1mC9HfQ6xSDNNZf4KWbZiFI=
|
github.com/openimsdk/protocol v0.0.69-alpha.50 h1:4r6vY9LsjFrR8AAwORFhijOGmq2vzDH3XTX4wBiw+2M=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.69-alpha.50/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
|
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||||
|
|||||||
@ -4,13 +4,16 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||||
pbuser "github.com/openimsdk/protocol/user"
|
pbuser "github.com/openimsdk/protocol/user"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -78,8 +81,10 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
opIdCtx := mcontext.SetOperationID(context.Background(), "r"+strconv.FormatUint(rNum, 10))
|
var count atomic.Int64
|
||||||
|
operationIDPrefix := fmt.Sprintf("p_%d_", os.Getpid())
|
||||||
doRequest := func(req *pbuser.SetUserOnlineStatusReq) {
|
doRequest := func(req *pbuser.SetUserOnlineStatusReq) {
|
||||||
|
opIdCtx := mcontext.SetOperationID(context.Background(), operationIDPrefix+strconv.FormatInt(count.Add(1), 10))
|
||||||
ctx, cancel := context.WithTimeout(opIdCtx, time.Second*5)
|
ctx, cancel := context.WithTimeout(opIdCtx, time.Second*5)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil {
|
if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil {
|
||||||
@ -102,7 +107,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
|
|||||||
case now := <-renewalTicker.C:
|
case now := <-renewalTicker.C:
|
||||||
deadline := now.Add(-cachekey.OnlineExpire / 3)
|
deadline := now.Add(-cachekey.OnlineExpire / 3)
|
||||||
users := ws.clients.GetAllUserStatus(deadline, now)
|
users := ws.clients.GetAllUserStatus(deadline, now)
|
||||||
log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "num", len(users))
|
log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "num", len(users), "users", users)
|
||||||
pushUserState(users...)
|
pushUserState(users...)
|
||||||
case state := <-ws.clients.UserState():
|
case state := <-ws.clients.UserState():
|
||||||
log.ZDebug(context.Background(), "OnlineCache user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline)
|
log.ZDebug(context.Background(), "OnlineCache user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline)
|
||||||
|
|||||||
@ -1,6 +1,10 @@
|
|||||||
package msggateway
|
package msggateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -117,6 +121,7 @@ func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (u *userMap) Set(userID string, client *Client) {
|
func (u *userMap) Set(userID string, client *Client) {
|
||||||
|
log.ZDebug(context.Background(), "userMap Set", "userID", userID, "platformID", client.PlatformID, "platform", constant.PlatformIDToName(client.PlatformID), "pointer", fmt.Sprintf("%p", client))
|
||||||
u.lock.Lock()
|
u.lock.Lock()
|
||||||
defer u.lock.Unlock()
|
defer u.lock.Unlock()
|
||||||
result, ok := u.data[userID]
|
result, ok := u.data[userID]
|
||||||
@ -162,12 +167,12 @@ func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
|
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) (result []UserState) {
|
||||||
u.lock.RLock()
|
u.lock.RLock()
|
||||||
defer u.lock.RUnlock()
|
defer u.lock.RUnlock()
|
||||||
result := make([]UserState, 0, len(u.data))
|
result = make([]UserState, 0, len(u.data))
|
||||||
for userID, userPlatform := range u.data {
|
for userID, userPlatform := range u.data {
|
||||||
if userPlatform.Time.Before(deadline) {
|
if deadline.Before(userPlatform.Time) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
userPlatform.Time = nowtime
|
userPlatform.Time = nowtime
|
||||||
|
|||||||
@ -202,7 +202,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
|
|||||||
var result []*msggateway.SingleMsgToUserResults
|
var result []*msggateway.SingleMsgToUserResults
|
||||||
if len(onlineUserIDs) > 0 {
|
if len(onlineUserIDs) > 0 {
|
||||||
var err error
|
var err error
|
||||||
result, err = c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
result, err = c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, onlineUserIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,13 +16,16 @@ package conversation
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||||
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||||
"github.com/openimsdk/tools/db/redisutil"
|
"github.com/openimsdk/tools/db/redisutil"
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
@ -40,10 +43,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type conversationServer struct {
|
type conversationServer struct {
|
||||||
msgRpcClient *rpcclient.MessageRpcClient
|
msgRpcClient *rpcclient.MessageRpcClient
|
||||||
user *rpcclient.UserRpcClient
|
user *rpcclient.UserRpcClient
|
||||||
groupRpcClient *rpcclient.GroupRpcClient
|
groupRpcClient *rpcclient.GroupRpcClient
|
||||||
conversationDatabase controller.ConversationDatabase
|
conversationDatabase controller.ConversationDatabase
|
||||||
|
|
||||||
conversationNotificationSender *ConversationNotificationSender
|
conversationNotificationSender *ConversationNotificationSender
|
||||||
config *Config
|
config *Config
|
||||||
}
|
}
|
||||||
@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
|
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
|
||||||
var conversation tablerelation.Conversation
|
var conversation dbModel.Conversation
|
||||||
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
|
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation})
|
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
var unequal int
|
var unequal int
|
||||||
var conv tablerelation.Conversation
|
var conv dbModel.Conversation
|
||||||
if len(req.UserIDs) == 1 {
|
if len(req.UserIDs) == 1 {
|
||||||
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
|
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
|||||||
}
|
}
|
||||||
conv = *cs[0]
|
conv = *cs[0]
|
||||||
}
|
}
|
||||||
var conversation tablerelation.Conversation
|
var conversation dbModel.Conversation
|
||||||
conversation.ConversationID = req.Conversation.ConversationID
|
conversation.ConversationID = req.Conversation.ConversationID
|
||||||
conversation.ConversationType = req.Conversation.ConversationType
|
conversation.ConversationType = req.Conversation.ConversationType
|
||||||
conversation.UserID = req.Conversation.UserID
|
conversation.UserID = req.Conversation.UserID
|
||||||
@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
|
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
|
||||||
var conversations []*tablerelation.Conversation
|
var conversations []*dbModel.Conversation
|
||||||
for _, ownerUserID := range req.UserIDs {
|
for _, ownerUserID := range req.UserIDs {
|
||||||
conversation2 := conversation
|
conversation2 := conversation
|
||||||
conversation2.OwnerUserID = ownerUserID
|
conversation2.OwnerUserID = ownerUserID
|
||||||
@ -340,12 +344,12 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
|||||||
) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
||||||
switch req.ConversationType {
|
switch req.ConversationType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
var conversation tablerelation.Conversation
|
var conversation dbModel.Conversation
|
||||||
conversation.ConversationID = req.ConversationID
|
conversation.ConversationID = req.ConversationID
|
||||||
conversation.ConversationType = req.ConversationType
|
conversation.ConversationType = req.ConversationType
|
||||||
conversation.OwnerUserID = req.SendID
|
conversation.OwnerUserID = req.SendID
|
||||||
conversation.UserID = req.RecvID
|
conversation.UserID = req.RecvID
|
||||||
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
|
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
|
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
|
||||||
}
|
}
|
||||||
@ -353,17 +357,17 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
|||||||
conversation2 := conversation
|
conversation2 := conversation
|
||||||
conversation2.OwnerUserID = req.RecvID
|
conversation2.OwnerUserID = req.RecvID
|
||||||
conversation2.UserID = req.SendID
|
conversation2.UserID = req.SendID
|
||||||
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2})
|
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
||||||
}
|
}
|
||||||
case constant.NotificationChatType:
|
case constant.NotificationChatType:
|
||||||
var conversation tablerelation.Conversation
|
var conversation dbModel.Conversation
|
||||||
conversation.ConversationID = req.ConversationID
|
conversation.ConversationID = req.ConversationID
|
||||||
conversation.ConversationType = req.ConversationType
|
conversation.ConversationType = req.ConversationType
|
||||||
conversation.OwnerUserID = req.RecvID
|
conversation.OwnerUserID = req.RecvID
|
||||||
conversation.UserID = req.SendID
|
conversation.UserID = req.SendID
|
||||||
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
|
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
||||||
}
|
}
|
||||||
@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
|
|||||||
if req.MaxSeq != nil {
|
if req.MaxSeq != nil {
|
||||||
m["max_seq"] = req.MaxSeq.Value
|
m["max_seq"] = req.MaxSeq.Value
|
||||||
}
|
}
|
||||||
|
if req.LatestMsgDestructTime != nil {
|
||||||
|
m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value)
|
||||||
|
}
|
||||||
if len(m) > 0 {
|
if len(m) > 0 {
|
||||||
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
|
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -602,3 +609,53 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
|
|||||||
Conversations: convert.ConversationsDB2Pb(conversations),
|
Conversations: convert.ConversationsDB2Pb(conversations),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) {
|
||||||
|
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
const batchNum = 100
|
||||||
|
|
||||||
|
if num == 0 {
|
||||||
|
return nil, errs.New("Need Destruct Msg is nil").Wrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
maxPage := (num + batchNum - 1) / batchNum
|
||||||
|
|
||||||
|
temp := make([]*model.Conversation, 0, maxPage*batchNum)
|
||||||
|
|
||||||
|
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
|
||||||
|
pagination := &sdkws.RequestPagination{
|
||||||
|
PageNumber: int32(pageNumber),
|
||||||
|
ShowNumber: batchNum,
|
||||||
|
}
|
||||||
|
|
||||||
|
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
|
||||||
|
if len(conversationIDs) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, conversation := range conversations {
|
||||||
|
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
|
||||||
|
conversation.LatestMsgDestructTime.IsZero()) {
|
||||||
|
temp = append(temp, conversation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -2,16 +2,22 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"github.com/openimsdk/protocol/conversation"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||||
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/wrapperspb"
|
"github.com/openimsdk/protocol/wrapperspb"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"strings"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"time"
|
"github.com/openimsdk/tools/utils/idutil"
|
||||||
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// hard delete in Database.
|
||||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
||||||
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -25,18 +31,6 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
|||||||
start = time.Now()
|
start = time.Now()
|
||||||
)
|
)
|
||||||
clearMsg := func(ctx context.Context) (bool, error) {
|
clearMsg := func(ctx context.Context) (bool, error) {
|
||||||
conversationSeqs := make(map[string]struct{})
|
|
||||||
defer func() {
|
|
||||||
req := &conversation.UpdateConversationReq{
|
|
||||||
MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
|
|
||||||
}
|
|
||||||
for conversationID := range conversationSeqs {
|
|
||||||
req.ConversationID = conversationID
|
|
||||||
if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
|
|
||||||
log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
|
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@ -44,6 +38,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
|||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
|
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -52,15 +47,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
|||||||
if len(index) == 0 {
|
if len(index) == 0 {
|
||||||
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
|
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
docNum++
|
docNum++
|
||||||
msgNum += len(index)
|
msgNum += len(index)
|
||||||
conversationID := msg.DocID[:strings.LastIndex(msg.DocID, ":")]
|
|
||||||
if _, ok := conversationSeqs[conversationID]; !ok {
|
|
||||||
conversationSeqs[conversationID] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
keep, err := clearMsg(ctx)
|
keep, err := clearMsg(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -71,7 +65,60 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
|||||||
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
||||||
}
|
}
|
||||||
return &msg.ClearMsgResp{}, nil
|
return &msg.ClearMsgResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// soft delete for self
|
||||||
|
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
|
||||||
|
temp := convert.ConversationsPb2DB(req.Conversations)
|
||||||
|
|
||||||
|
batchNum := 100
|
||||||
|
|
||||||
|
errg, _ := errgroup.WithContext(ctx)
|
||||||
|
errg.SetLimit(100)
|
||||||
|
|
||||||
|
for i := 0; i < len(temp); i += batchNum {
|
||||||
|
batch := temp[i:min(i+batchNum, len(temp))]
|
||||||
|
|
||||||
|
errg.Go(func() error {
|
||||||
|
for _, conversation := range batch {
|
||||||
|
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
|
||||||
|
log.ZDebug(handleCtx, "User MsgsDestruct",
|
||||||
|
"conversationID", conversation.ConversationID,
|
||||||
|
"ownerUserID", conversation.OwnerUserID,
|
||||||
|
"msgDestructTime", conversation.MsgDestructTime,
|
||||||
|
"lastMsgDestructTime", conversation.LatestMsgDestructTime)
|
||||||
|
|
||||||
|
seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(seqs) > 0 {
|
||||||
|
if err := m.Conversation.UpdateConversation(handleCtx,
|
||||||
|
&pbconversation.UpdateConversationReq{
|
||||||
|
UserIDs: []string{conversation.OwnerUserID},
|
||||||
|
ConversationID: conversation.ConversationID,
|
||||||
|
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
|
||||||
|
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if you need Notify SDK client userseq is update.
|
||||||
|
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := errg.Wait(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||||
@ -50,6 +51,7 @@ type (
|
|||||||
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
|
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
|
||||||
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
|
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
|
||||||
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
|
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
|
||||||
|
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
|
||||||
config *Config // Global configuration settings.
|
config *Config // Global configuration settings.
|
||||||
webhookClient *webhook.Client
|
webhookClient *webhook.Client
|
||||||
}
|
}
|
||||||
@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
|
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
|
||||||
|
s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))
|
||||||
|
|
||||||
msg.RegisterMsgServer(server, s)
|
msg.RegisterMsgServer(server, s)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -17,16 +17,19 @@ package tools
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/third"
|
"github.com/openimsdk/protocol/third"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/mw"
|
"github.com/openimsdk/tools/mw"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
@ -50,34 +53,69 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
|
|||||||
}
|
}
|
||||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
|
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
|
||||||
conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
|
|
||||||
|
msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cli := msg.NewMsgClient(conn)
|
|
||||||
|
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgClient := msg.NewMsgClient(msgConn)
|
||||||
|
conversationClient := pbconversation.NewConversationClient(conversationConn)
|
||||||
|
thirdClient := third.NewThirdClient(thirdConn)
|
||||||
|
|
||||||
crontab := cron.New()
|
crontab := cron.New()
|
||||||
clearFunc := func() {
|
|
||||||
|
// scheduled hard delete outdated Msgs in specific time.
|
||||||
|
clearMsgFunc := func() {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
|
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
|
||||||
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
|
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
|
||||||
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
||||||
if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
|
if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
|
||||||
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
|
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
|
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
|
||||||
}
|
}
|
||||||
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil {
|
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
|
// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
|
||||||
if err != nil {
|
msgDestructFunc := func() {
|
||||||
return err
|
now := time.Now()
|
||||||
}
|
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
|
||||||
thirdClient := third.NewThirdClient(tConn)
|
log.ZInfo(ctx, "msg destruct cron start", "now", now)
|
||||||
|
|
||||||
deleteFunc := func() {
|
conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
_, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "Destruct Msgs failed.", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now))
|
||||||
|
}
|
||||||
|
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// scheduled delete outdated file Objects and their datas in specific time.
|
||||||
|
deleteObjectFunc := func() {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
|
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
|
||||||
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
|
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
|
||||||
@ -88,9 +126,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
|
|||||||
}
|
}
|
||||||
log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
|
log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
|
||||||
}
|
}
|
||||||
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil {
|
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
|
log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
|
||||||
crontab.Start()
|
crontab.Start()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|||||||
@ -22,7 +22,7 @@ import (
|
|||||||
|
|
||||||
func ConversationDB2Pb(conversationDB *model.Conversation) *conversation.Conversation {
|
func ConversationDB2Pb(conversationDB *model.Conversation) *conversation.Conversation {
|
||||||
conversationPB := &conversation.Conversation{}
|
conversationPB := &conversation.Conversation{}
|
||||||
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
|
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli()
|
||||||
if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
|
if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -35,7 +35,7 @@ func ConversationsDB2Pb(conversationsDB []*model.Conversation) (conversationsPB
|
|||||||
if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
|
if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
|
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli()
|
||||||
conversationsPB = append(conversationsPB, conversationPB)
|
conversationsPB = append(conversationsPB, conversationPB)
|
||||||
}
|
}
|
||||||
return conversationsPB
|
return conversationsPB
|
||||||
|
|||||||
@ -82,7 +82,7 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
|
func (c *ConversationRpcClient) UpdateConversation(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
|
||||||
_, err := c.Client.UpdateConversation(ctx, conversation)
|
_, err := c.Client.UpdateConversation(ctx, conversation)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -146,3 +146,11 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont
|
|||||||
}
|
}
|
||||||
return resp.UserIDs, nil
|
return resp.UserIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) {
|
||||||
|
resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp.Conversations, nil
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user