mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 20:11:14 +08:00
new feature data convert (#819)
* new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * MsgDestructTime * fix bug: msg destruct sql * fix bug: msg destruct * fix bug: msg destruct * fix bug: msg destruct sql * fix bug: msg destruct sql * fix bug: msg destruct sql * fix bug: msg destruct sql * debug: print stack * debug: print stack * debug: print stack * fix bug: msg destruct sql Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com> * fix bug: msg notification self 2 self push twice * fix bug: heartbeat get self notification * fix bug: init grpc conn in one process * fix bug: grpc conn Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com> * fix bug: zk client recreate node when reconn * fix bug: set friend mark args error * fix bug: rpc client intercepter called twice Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * test: document msg num set 100 * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * merge code * merge code * fix bug: repeat add friend not effect * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed --------- Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com> Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: wangchuxiao-dev <wangchuxiao-dev@users.noreply.github.com>
This commit is contained in:
parent
04a97ac154
commit
5615e47d99
@ -21,7 +21,7 @@ import (
|
||||
|
||||
func main() {
|
||||
cronTaskCmd := cmd.NewCronTaskCmd()
|
||||
if err := cronTaskCmd.Exec(tools.StartCronTask); err != nil {
|
||||
if err := cronTaskCmd.Exec(tools.StartTask); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,6 @@
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@ -145,7 +145,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
len(modifyMsgList),
|
||||
)
|
||||
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
||||
conversationIDNotification := msgprocessor.GetNotificationConversationID(ctxMsgList[0].message)
|
||||
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
||||
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
||||
och.handleNotification(
|
||||
ctx,
|
||||
|
@ -20,16 +20,12 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
pbuser "github.com/OpenIMSDK/protocol/user"
|
||||
registry "github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/tx"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
@ -40,10 +36,11 @@ import (
|
||||
tablerelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
|
||||
registry "github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pbuser "github.com/OpenIMSDK/protocol/user"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type userServer struct {
|
||||
|
@ -26,12 +26,13 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
)
|
||||
|
||||
func StartCronTask() error {
|
||||
func StartTask() error {
|
||||
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
|
||||
msgTool, err := InitMsgTool()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgTool.ConvertTools()
|
||||
c := cron.New()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
32
internal/tools/msg_doc_convert.go
Normal file
32
internal/tools/msg_doc_convert.go
Normal file
@ -0,0 +1,32 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
)
|
||||
|
||||
func (c *MsgTool) ConvertTools() {
|
||||
ctx := mcontext.NewCtx("convert")
|
||||
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "get all conversation ids failed", err)
|
||||
return
|
||||
}
|
||||
for _, conversationID := range conversationIDs {
|
||||
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationIDByConversationID(conversationID))
|
||||
}
|
||||
userIDs, err := c.userDatabase.GetAllUserID(ctx, 0, 0)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "get all user ids failed", err)
|
||||
return
|
||||
}
|
||||
log.ZDebug(ctx, "all userIDs", "len userIDs", len(userIDs))
|
||||
for _, userID := range userIDs {
|
||||
conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID))
|
||||
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID))
|
||||
}
|
||||
log.ZDebug(ctx, "all conversationIDs", "len userIDs", len(conversationIDs))
|
||||
c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
||||
}
|
@ -118,6 +118,7 @@ type CommonMsgDatabase interface {
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
}
|
||||
|
||||
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
||||
@ -965,3 +966,7 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.Searc
|
||||
}
|
||||
return total, totalMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
||||
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
||||
}
|
||||
|
@ -86,7 +86,11 @@ func (u *UserGorm) Page(
|
||||
|
||||
// 获取所有用户ID.
|
||||
func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
|
||||
return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error)
|
||||
if pageNumber == 0 || showNumber == 0 {
|
||||
return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error)
|
||||
} else {
|
||||
return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
|
||||
|
@ -27,10 +27,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
singleGocMsgNum = 5000
|
||||
Msg = "msg"
|
||||
OldestList = 0
|
||||
NewestList = -1
|
||||
singleGocMsgNum = 100
|
||||
singleGocMsgNum5000 = 5000
|
||||
Msg = "msg"
|
||||
OldestList = 0
|
||||
NewestList = -1
|
||||
)
|
||||
|
||||
type MsgDocModel struct {
|
||||
@ -128,6 +129,7 @@ type MsgDocModelInterface interface {
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
}
|
||||
|
||||
func (MsgDocModel) TableName() string {
|
||||
@ -138,6 +140,10 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 {
|
||||
return singleGocMsgNum
|
||||
}
|
||||
|
||||
func (MsgDocModel) GetSingleGocMsgNum5000() int64 {
|
||||
return singleGocMsgNum5000
|
||||
}
|
||||
|
||||
func (m *MsgDocModel) IsFull() bool {
|
||||
return m.Msg[len(m.Msg)-1].Msg != nil
|
||||
}
|
||||
|
67
pkg/common/db/unrelation/msg_convert.go
Normal file
67
pkg/common/db/unrelation/msg_convert.go
Normal file
@ -0,0 +1,67 @@
|
||||
package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
||||
for _, conversationID := range conversationIDs {
|
||||
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
var msgDocs []table.MsgDocModel
|
||||
err = cursor.All(ctx, &msgDocs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
if len(msgDocs) < 1 {
|
||||
continue
|
||||
}
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
|
||||
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
|
||||
if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil {
|
||||
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
var newMsgDocs []interface{}
|
||||
for _, msgDoc := range msgDocs {
|
||||
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
|
||||
continue
|
||||
}
|
||||
var index int64
|
||||
for index < int64(len(msgDoc.Msg)) {
|
||||
msg := msgDoc.Msg[index]
|
||||
if msg != nil && msg.Msg != nil {
|
||||
msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
|
||||
end := index + m.model.GetSingleGocMsgNum()
|
||||
if int(end) >= len(msgDoc.Msg) {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:]
|
||||
} else {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:end]
|
||||
}
|
||||
newMsgDocs = append(newMsgDocs, msgDocModel)
|
||||
index = end
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err = m.MsgCollection.InsertMany(ctx, newMsgDocs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
} else {
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -23,7 +23,7 @@ import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func GetNotificationConversationID(msg *sdkws.MsgData) string {
|
||||
func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string {
|
||||
switch msg.SessionType {
|
||||
case constant.SingleChatType:
|
||||
l := []string{msg.SendID, msg.RecvID}
|
||||
@ -114,6 +114,30 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func GetNotificationConversationIDByConversationID(conversationID string) string {
|
||||
l := strings.Split(conversationID, "_")
|
||||
if len(l) > 1 {
|
||||
l[0] = "n"
|
||||
return strings.Join(l, "_")
|
||||
} else {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func GetNotificationConversationID(sessionType int, ids ...string) string {
|
||||
sort.Strings(ids)
|
||||
if len(ids) > 2 || len(ids) < 1 {
|
||||
return ""
|
||||
}
|
||||
switch sessionType {
|
||||
case constant.SingleChatType:
|
||||
return "n_" + strings.Join(ids, "_") // single chat
|
||||
case constant.SuperGroupChatType:
|
||||
return "n_" + ids[0] // super group chat
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func IsNotification(conversationID string) bool {
|
||||
return strings.HasPrefix(conversationID, "n_")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user