refactoring scheduled tasks

This commit is contained in:
withchao 2024-12-20 15:26:32 +08:00
parent 86d58253c9
commit 46a8d17104
7 changed files with 56 additions and 33 deletions

4
go.mod
View File

@ -14,8 +14,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.66
github.com/openimsdk/tools v0.0.50-alpha.57
github.com/openimsdk/protocol v0.0.72-alpha.67
github.com/openimsdk/tools v0.0.50-alpha.58
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0

8
go.sum
View File

@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.66 h1:5KoDY6M4T+pXg449ScF6hqeQ+WenBwNyUJn/t8W0oBQ=
github.com/openimsdk/protocol v0.0.72-alpha.66/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.57 h1:oIKV6vYhqp7TRmZ6Pe+r9RNl1D5s7aB/kE9yQVEWcSY=
github.com/openimsdk/tools v0.0.50-alpha.57/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
github.com/openimsdk/protocol v0.0.72-alpha.67 h1:zlLbVkoT0OYsjO2RCutQuDFllcfNvZfdYchvlR6UIe0=
github.com/openimsdk/protocol v0.0.72-alpha.67/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.58 h1:hkFL02Bzzp/l5x+tb7kJ9zes7hilh65EQ4qEIthsQX4=
github.com/openimsdk/tools v0.0.50-alpha.58/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -769,7 +769,8 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
if err != nil {
return nil, err
}
for _, conversation := range conversations {
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
for i, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
@ -778,17 +779,37 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
if err != nil {
return nil, err
}
if resp.Seq == 0 {
if resp.Seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq)
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
return nil, err
}
continue
}
_, err = c.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{
ConversationID: conversation.ConversationID,
OwnerUserID: []string{conversation.OwnerUserID},
MinSeq: resp.Seq + 1,
})
if err != nil {
resp.Seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil {
return nil, err
}
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime)
}
return &pbconversation.ClearUserConversationMsgResp{}, nil
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
}
func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx context.Context, conversationID string, ownerUserID string, minSeq int64, latestMsgDestructTime time.Time) error {
update := map[string]any{
"latest_msg_destruct_time": latestMsgDestructTime,
}
if minSeq >= 0 {
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq}
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
return err
}
update["min_seq"] = minSeq
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{ownerUserID}, conversationID, update); err != nil {
return err
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}

View File

@ -27,10 +27,11 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
if err != nil {
return nil, err
}
for _, doc := range docs {
for i, doc := range docs {
if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil {
return nil, err
}
log.ZDebug(ctx, "DestructMsgs delete doc", "index", i, "docID", doc.DocID)
index := strings.LastIndex(doc.DocID, ":")
if index < 0 {
continue
@ -51,9 +52,11 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
if conversationID == "" {
continue
}
minSeq++
if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil {
return nil, err
}
log.ZDebug(ctx, "DestructMsgs delete doc set min seq", "index", i, "docID", doc.DocID, "conversationID", conversationID, "setMinSeq", minSeq)
}
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
}

View File

@ -295,22 +295,19 @@ func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteO
if err != nil {
return nil, err
}
keyCount := make(map[string]int)
for _, obj := range models {
keyCount[obj.Key]++
}
for _, obj := range models {
count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key)
if err != nil {
return nil, err
}
for i, obj := range models {
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil {
return nil, errs.Wrap(err)
}
if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil {
return nil, err
}
if int(count) <= keyCount[obj.Key] {
count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "delete s3 object record", "index", i, "s3", obj, "count", count)
if count == 0 {
if err := t.s3.DeleteObject(ctx, obj.Key); err != nil {
return nil, err
}

View File

@ -1318,16 +1318,14 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str
{
"$match": bson.M{
"doc_id": bson.M{
"$regex": fmt.Sprintf("^%s:", conversationID),
"$regex": fmt.Sprintf("^%s", conversationID),
},
},
},
{
"$match": bson.M{
"msgs.msg.send_time": bson.M{
"msgs.msg.send_time": bson.M{
"$lte": time,
},
"$lte": time,
},
},
},
@ -1342,6 +1340,7 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str
{
"$project": bson.M{
"_id": 0,
"doc_id": 1,
"msgs.msg.send_time": 1,
"msgs.msg.seq": 1,
},
@ -1356,6 +1355,9 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str
}
var seq int64
for _, v := range res[0].Msg {
if v.Msg == nil {
continue
}
if v.Msg.SendTime <= time {
seq = v.Msg.Seq
}

View File

@ -95,13 +95,13 @@ func TestName4(t *testing.T) {
defer cancel()
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
msg, err := NewConversationMongo(cli.Database("openim_v3"))
msg, err := NewMsgMongo(cli.Database("openim_v3"))
if err != nil {
panic(err)
}
ts := time.Now().UnixMilli()
ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli()
t.Log(ts)
res, err := msg.FindRandConversation(ctx, ts, 10)
res, err := msg.GetLastMessageSeqByTime(ctx, "sg_1523453548", ts)
if err != nil {
panic(err)
}