From 4fd31e436767888dfa88a19be4cd11394160799b Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 26 Dec 2022 10:25:42 +0800 Subject: [PATCH 01/32] getui --- internal/cron_task/clear_msg.go | 87 ++++++--------- internal/cron_task/clear_msg_test.go | 156 ++++++++++++++------------- internal/cron_task/cron_task.go | 12 +-- internal/push/getui/push.go | 4 +- 4 files changed, 115 insertions(+), 144 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 6e7db12d1..ae491734a 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -100,63 +100,46 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if err != nil { return 0, err } - return delStruct.getSetMinSeq(), nil + return delStruct.getSetMinSeq() + 1, nil } log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) if len(msgs.Msg) > db.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } - // lastMsgSendTime := msgs.Msg[len(msgs.Msg)-1].SendTime - - var hasMsgDoNotNeedDel bool - for i, msg := range msgs.Msg { - // 找到列表中不需要删除的消息了, 表示为递归到最后一个块 - if utils.GetCurrentTimestampByMill() < msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { - log.NewDebug(operationID, ID, "find uid", msgs.UID) - // 删除块失败 递归结束 返回0 - hasMsgDoNotNeedDel = true - if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { - return 0, err - } - // unMarshall失败 块删除成功 设置为最小seq - msgPb := &server_api_params.MsgData{} - if err = proto.Unmarshal(msg.Msg, msgPb); err != nil { - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - // 如果不是块中第一个,就把前面比他早插入的全部设置空 seq字段除外。 - if i > 0 { - delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - } - // 递归结束 - return msgPb.Seq, nil - } - } - // 该列表中消息全部为老消息并且列表满了, 加入删除列表继续递归 - // lastMsgPb := &server_api_params.MsgData{} - // err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) - // if err != nil { - // log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) - // return 0, utils.Wrap(err, "proto.Unmarshal failed") - // } - // delStruct.minSeq = lastMsgPb.Seq - if msgListIsFull(msgs) { - log.NewDebug(operationID, "msg list is full", msgs.UID) + if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { delStruct.delUidList = append(delStruct.delUidList, msgs.UID) + lastMsgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + delStruct.minSeq = lastMsgPb.Seq } else { - // 列表没有满且没有不需要被删除的消息 代表他是最新的消息块 - if !hasMsgDoNotNeedDel { - delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, len(msgs.Msg)-1) + var hasMarkDelFlag bool + for _, msg := range msgs.Msg { + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msg.Msg, msgPb) if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, "Index:", len(msgs.Msg)-1) - err = delMongoMsgsPhysical(delStruct.delUidList) - if err != nil { - return delStruct.getSetMinSeq(), err + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { + msgPb.Status = constant.MsgDeleted + bytes, _ := proto.Marshal(msgPb) + msg.Msg = bytes + msg.SendTime = 0 + hasMarkDelFlag = true + } else { + if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { + return 0, err } - return delStruct.getSetMinSeq(), nil + if hasMarkDelFlag { + if err := db.DB.UpdateOneMsgList(msgs); err != nil { + return delStruct.getSetMinSeq(), utils.Wrap(err, "") + } + } + return msgPb.Seq + 1, nil } } } @@ -179,14 +162,6 @@ func msgListIsFull(chat *db.UserChat) bool { return false } -func CheckGroupUserMinSeq(operationID, groupID, userID string) error { - return nil -} - -func CheckUserMinSeqWithMongo(operationID, userID string) error { - return nil -} - func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { var seqRedis uint64 var err error diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 0a2ba7071..88eac2df9 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -3,100 +3,102 @@ package cronTask import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" - pbMsg "Open_IM/pkg/proto/msg" server_api_params "Open_IM/pkg/proto/sdk_ws" - "Open_IM/pkg/utils" - "os/exec" + "context" + "fmt" + "strconv" + + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "testing" "time" ) -func getMsgListFake(num int) []*pbMsg.MsgDataToMQ { - var msgList []*pbMsg.MsgDataToMQ - for i := 1; i < num; i++ { - msgList = append(msgList, &pbMsg.MsgDataToMQ{ - Token: "tk", - OperationID: "operationID", - MsgData: &server_api_params.MsgData{ - SendID: "sendID1", - RecvID: "recvID1", - GroupID: "", - ClientMsgID: "xxx", - ServerMsgID: "xxx", - SenderPlatformID: 1, - SenderNickname: "testNickName", - SenderFaceURL: "testFaceURL", - SessionType: 1, - MsgFrom: 100, - ContentType: 101, - Content: []byte("testFaceURL"), - Seq: uint32(i), - SendTime: time.Now().Unix(), - CreateTime: time.Now().Unix(), - Status: 1, - }, - }) +var ( + redisClient *redis.Client + mongoClient *mongo.Collection +) + +func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat { + chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))} + for i := startSeq; i <= stopSeq; i++ { + msg := server_api_params.MsgData{ + SendID: "sendID1", + RecvID: "recvID1", + GroupID: "", + ClientMsgID: "xxx", + ServerMsgID: "xxx", + SenderPlatformID: 1, + SenderNickname: "testNickName", + SenderFaceURL: "testFaceURL", + SessionType: 1, + MsgFrom: 100, + ContentType: 101, + Content: []byte("testFaceURL"), + Seq: uint32(i), + SendTime: time.Now().Unix(), + CreateTime: time.Now().Unix(), + Status: 1, + } + bytes, _ := proto.Marshal(&msg) + sendTime := 0 + chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) } - return msgList + return chat +} + +func SetUserMaxSeq(userID string, seq int) error { + return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err() +} + +func CreateChat(userChat *db.UserChat) error { + _, err := mongoClient.InsertOne(context.Background(), userChat) + return err } func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { - operationID := getCronTaskOperationID() + redisClient = redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:16379", + Password: "openIM123", // no password set + DB: 13, // use default DB + }) + mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", + "root", "openIM123", "127.0.0.1:37017", + "openIM", 100) + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri)) + mongoClient = client.Database("openIM").Collection("msg") testUID1 := "test_del_id1" //testUID2 := "test_del_id2" //testUID3 := "test_del_id3" //testUID4 := "test_del_id4" //testUID5 := "test_del_id5" //testUID6 := "test_del_id6" - testUserIDList := []string{testUID1} + err = SetUserMaxSeq(testUID1, 600) + userChat := GenUserChat(1, 500, 200, 0, testUID1) + err = CreateChat(userChat) - err := db.DB.SetUserMaxSeq(testUID1, 500) - err = db.DB.BatchInsertChat2DB(testUID1, getMsgListFake(500), testUID1+"-"+operationID, 500) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID1); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID1) + } + if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID1) + } if err != nil { - t.Error(err.Error(), testUID1) - } - //db.DB.SetUserMaxSeq(testUID1, 6000) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 4999) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 30000) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 9999) - //db.DB.BatchInsertChat2DB() - cmd := exec.Command("/bin/bash", "unset $CONFIG_NAME") - _, err = cmd.StdoutPipe() - if err != nil { - return - } - - //执行命令 - if err := cmd.Start(); err != nil { - return - } - for _, userID := range testUserIDList { - operationID = userID + "-" + operationID - if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { - t.Error("checkMaxSeqWithMongo failed", userID) - } - if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", userID) - } - } - - testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"} - for _, groupID := range testWorkingGroupIDList { - operationID = groupID + "-" + operationID - log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList) - if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil { - t.Error("checkMaxSeqWithMongo failed", groupID) - } - if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", groupID) - } + t.Error("err is not nil", testUID1, err.Error()) } + // testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"} + // for _, groupID := range testWorkingGroupIDList { + // operationID = groupID + "-" + operationID + // log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList) + // if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil { + // t.Error("checkMaxSeqWithMongo failed", groupID) + // } + // if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { + // t.Error("checkMaxSeqWithMongo failed", groupID) + // } + // } } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 9e4bcbfd4..484def4f5 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -8,8 +8,9 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "fmt" - "github.com/robfig/cron/v3" "time" + + "github.com/robfig/cron/v3" ) const cronTaskOperationID = "cronTaskOperationID-" @@ -57,6 +58,7 @@ func ClearAll() { } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } + // working group msg clear workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) if err == nil { @@ -77,9 +79,6 @@ func StartClearMsg(operationID string, userIDList []string) { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } - if err := CheckUserMinSeqWithMongo(operationID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), userID, err) - } } } @@ -98,10 +97,5 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } - for _, userID := range userIDList { - if err := CheckGroupUserMinSeq(operationID, groupID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) - } - } } } diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 0c337dba8..783f62f54 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -264,8 +264,8 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr func (pushReq *PushReq) setPushChannel(title string, body string) { pushReq.PushChannel = &PushChannel{} - autoBadge := "+1" - pushReq.PushChannel.Ios = &Ios{AutoBadge: &autoBadge} + // autoBadge := "+1" + pushReq.PushChannel.Ios = &Ios{} notify := "notify" pushReq.PushChannel.Ios.NotiType = ¬ify pushReq.PushChannel.Ios.Aps.Sound = "default" From e588091bf6d060a934a11f08361165e948460de7 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 27 Dec 2022 11:01:21 +0800 Subject: [PATCH 02/32] mongodb --- pkg/common/db/mongoModel.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 8ae026522..4d7d01f80 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -10,13 +10,14 @@ import ( "context" "errors" "fmt" + "math/rand" + "sync" + "github.com/go-redis/redis/v8" "github.com/gogo/protobuf/sortkeys" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "math/rand" - "sync" //"github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" @@ -206,6 +207,13 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat return nil } +func (d *DataBases) UpdateOneMsgList(msg *UserChat) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + _, err := c.UpdateOne(ctx, bson.M{"uid": msg.UID}, bson.M{"$set": bson.M{"msg": msg.Msg}}) + return err +} + func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList) var hasSeqList []uint32 From 0e62deed0a0a6fe96e8618f7de4616aaa6371a7e Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 27 Dec 2022 22:27:44 +0800 Subject: [PATCH 03/32] test cron --- internal/cron_task/clear_msg_test.go | 32 ++++++++++++++++------------ internal/cron_task/test/main.go | 6 ------ internal/push/getui/push.go | 4 ++-- 3 files changed, 20 insertions(+), 22 deletions(-) delete mode 100644 internal/cron_task/test/main.go diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 88eac2df9..e70436875 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" server_api_params "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" "context" "fmt" "strconv" @@ -24,7 +25,7 @@ var ( func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat { chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))} - for i := startSeq; i <= stopSeq; i++ { + for i := startSeq; i < stopSeq; i++ { msg := server_api_params.MsgData{ SendID: "sendID1", RecvID: "recvID1", @@ -44,7 +45,12 @@ func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.Use Status: 1, } bytes, _ := proto.Marshal(&msg) - sendTime := 0 + var sendTime int64 + if i <= delSeq { + sendTime = 10000 + } else { + sendTime = utils.GetCurrentTimestampByMill() + } chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) } return chat @@ -54,6 +60,12 @@ func SetUserMaxSeq(userID string, seq int) error { return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err() } +func GetUserMinSeq(userID string) (uint64, error) { + key := "REDIS_USER_MIN_SEQ:" + userID + seq, err := redisClient.Get(context.Background(), key).Result() + return uint64(utils.StringToInt(seq)), err +} + func CreateChat(userChat *db.UserChat) error { _, err := mongoClient.InsertOne(context.Background(), userChat) return err @@ -80,25 +92,17 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { err = SetUserMaxSeq(testUID1, 600) userChat := GenUserChat(1, 500, 200, 0, testUID1) err = CreateChat(userChat) - if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID1); err != nil { t.Error("checkMaxSeqWithMongo failed", testUID1) } if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil { t.Error("checkMaxSeqWithMongo failed", testUID1) } + minSeq, err := GetUserMinSeq(testUID1) if err != nil { t.Error("err is not nil", testUID1, err.Error()) } - // testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"} - // for _, groupID := range testWorkingGroupIDList { - // operationID = groupID + "-" + operationID - // log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList) - // if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil { - // t.Error("checkMaxSeqWithMongo failed", groupID) - // } - // if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { - // t.Error("checkMaxSeqWithMongo failed", groupID) - // } - // } + if minSeq != 201 { + t.Error("is not the same") + } } diff --git a/internal/cron_task/test/main.go b/internal/cron_task/test/main.go deleted file mode 100644 index 5a2b3f44a..000000000 --- a/internal/cron_task/test/main.go +++ /dev/null @@ -1,6 +0,0 @@ -package main - -// -//func main() { -// db.DB.BatchInsertChat() -//} diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 783f62f54..0c337dba8 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -264,8 +264,8 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr func (pushReq *PushReq) setPushChannel(title string, body string) { pushReq.PushChannel = &PushChannel{} - // autoBadge := "+1" - pushReq.PushChannel.Ios = &Ios{} + autoBadge := "+1" + pushReq.PushChannel.Ios = &Ios{AutoBadge: &autoBadge} notify := "notify" pushReq.PushChannel.Ios.NotiType = ¬ify pushReq.PushChannel.Ios.Aps.Sound = "default" From 2608ae58df7528d5268f744fd0f58d2eedd780e7 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 10:55:00 +0800 Subject: [PATCH 04/32] test cron --- internal/cron_task/clear_msg_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index e70436875..60a01d2e2 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -24,8 +24,8 @@ var ( ) func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat { - chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))} - for i := startSeq; i < stopSeq; i++ { + chat := &db.UserChat{UID: userID + ":" + strconv.Itoa(int(index))} + for i := startSeq; i <= stopSeq; i++ { msg := server_api_params.MsgData{ SendID: "sendID1", RecvID: "recvID1", From 40fc3e6c08f70de451a0304215a68cf1803bf88b Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 10:58:30 +0800 Subject: [PATCH 05/32] test cron --- internal/cron_task/clear_msg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 60a01d2e2..ff9cac5ae 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -76,7 +76,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { redisClient = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:16379", Password: "openIM123", // no password set - DB: 13, // use default DB + DB: 0, // use default DB }) mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", "root", "openIM123", "127.0.0.1:37017", From 1706f5df07b30b36952f3ff6b2a850ddcc30718a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 11:02:14 +0800 Subject: [PATCH 06/32] test cron --- internal/cron_task/clear_msg.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index ae491734a..afed8e585 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -117,7 +117,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs delStruct.minSeq = lastMsgPb.Seq } else { var hasMarkDelFlag bool - for _, msg := range msgs.Msg { + for index, msg := range msgs.Msg { msgPb := &server_api_params.MsgData{} err = proto.Unmarshal(msg.Msg, msgPb) if err != nil { @@ -135,6 +135,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return 0, err } if hasMarkDelFlag { + log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb) if err := db.DB.UpdateOneMsgList(msgs); err != nil { return delStruct.getSetMinSeq(), utils.Wrap(err, "") } From f9fdd540f958f336e799e3f8517a5a5c16f985df Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 11:03:12 +0800 Subject: [PATCH 07/32] test cron --- internal/cron_task/clear_msg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index ff9cac5ae..87d735438 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -103,6 +103,6 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID1, err.Error()) } if minSeq != 201 { - t.Error("is not the same") + t.Error("is not the same", "minSeq:", minSeq, "targetSeq", 201) } } From a6bd8e5205917d1e1d8b616036f00281f63ca9db Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 11:15:15 +0800 Subject: [PATCH 08/32] test cron --- internal/cron_task/clear_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index afed8e585..27954ce59 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -140,7 +140,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return delStruct.getSetMinSeq(), utils.Wrap(err, "") } } - return msgPb.Seq + 1, nil + return msgPb.Seq, nil } } } From ac99da044a2dda3a0e6bbf8877a253cd1d2d2712 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 11:33:49 +0800 Subject: [PATCH 09/32] test cron --- internal/cron_task/clear_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 27954ce59..f764b792b 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -135,7 +135,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return 0, err } if hasMarkDelFlag { - log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb) + log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID) if err := db.DB.UpdateOneMsgList(msgs); err != nil { return delStruct.getSetMinSeq(), utils.Wrap(err, "") } From 458f872fe3f8ce6d2b38ee49e5c859101850c1f1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 11:47:39 +0800 Subject: [PATCH 10/32] test cron --- internal/cron_task/clear_msg.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index f764b792b..c08dbf0e3 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -130,6 +130,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs msg.Msg = bytes msg.SendTime = 0 hasMarkDelFlag = true + log.NewDebug(operationID, ID, msgPb.Seq) } else { if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { return 0, err From cff9ee39be6212c7c3e5900492182f1f83d7bbd8 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 12:02:56 +0800 Subject: [PATCH 11/32] test cron --- internal/cron_task/clear_msg.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index c08dbf0e3..06a279de7 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -130,13 +130,12 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs msg.Msg = bytes msg.SendTime = 0 hasMarkDelFlag = true - log.NewDebug(operationID, ID, msgPb.Seq) } else { if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { return 0, err } if hasMarkDelFlag { - log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID) + log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID, msgs.Msg) if err := db.DB.UpdateOneMsgList(msgs); err != nil { return delStruct.getSetMinSeq(), utils.Wrap(err, "") } From 1fdc4d06d1c5bf3ae7f4a6d9022e22892a02d5cb Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 12:10:10 +0800 Subject: [PATCH 12/32] test cron --- internal/cron_task/clear_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 06a279de7..944183b08 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -135,7 +135,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return 0, err } if hasMarkDelFlag { - log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID, msgs.Msg) + log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID, msgs.Msg[0].SendTime) if err := db.DB.UpdateOneMsgList(msgs); err != nil { return delStruct.getSetMinSeq(), utils.Wrap(err, "") } From 71ad683614c33eebce14a526dec5550da51b8435 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 12:42:17 +0800 Subject: [PATCH 13/32] test cron --- internal/cron_task/clear_msg.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 944183b08..f6ea7cd1f 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -124,18 +124,18 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) return 0, utils.Wrap(err, "proto.Unmarshal failed") } - if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { + if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) && msg.SendTime != 0 { msgPb.Status = constant.MsgDeleted bytes, _ := proto.Marshal(msgPb) - msg.Msg = bytes - msg.SendTime = 0 + msgs.Msg[index].Msg = bytes + msgs.Msg[index].SendTime = 0 hasMarkDelFlag = true } else { if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { return 0, err } if hasMarkDelFlag { - log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID, msgs.Msg[0].SendTime) + log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID) if err := db.DB.UpdateOneMsgList(msgs); err != nil { return delStruct.getSetMinSeq(), utils.Wrap(err, "") } From 7033b603ca07e493cdf3bb28b2a905bd674703e7 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 13:56:07 +0800 Subject: [PATCH 14/32] test cron --- internal/cron_task/clear_msg.go | 4 +-- internal/cron_task/clear_msg_test.go | 44 +++++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index f6ea7cd1f..36239c8fb 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -114,7 +114,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) return 0, utils.Wrap(err, "proto.Unmarshal failed") } - delStruct.minSeq = lastMsgPb.Seq + delStruct.minSeq = lastMsgPb.Seq + 1 } else { var hasMarkDelFlag bool for index, msg := range msgs.Msg { @@ -124,7 +124,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) return 0, utils.Wrap(err, "proto.Unmarshal failed") } - if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) && msg.SendTime != 0 { + if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { msgPb.Status = constant.MsgDeleted bytes, _ := proto.Marshal(msgPb) msgs.Msg[index].Msg = bytes diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 87d735438..678bfe1f4 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -84,13 +84,12 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri)) mongoClient = client.Database("openIM").Collection("msg") testUID1 := "test_del_id1" - //testUID2 := "test_del_id2" - //testUID3 := "test_del_id3" + //testUID4 := "test_del_id4" //testUID5 := "test_del_id5" //testUID6 := "test_del_id6" err = SetUserMaxSeq(testUID1, 600) - userChat := GenUserChat(1, 500, 200, 0, testUID1) + userChat := GenUserChat(1, 600, 200, 0, testUID1) err = CreateChat(userChat) if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID1); err != nil { t.Error("checkMaxSeqWithMongo failed", testUID1) @@ -103,6 +102,43 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID1, err.Error()) } if minSeq != 201 { - t.Error("is not the same", "minSeq:", minSeq, "targetSeq", 201) + t.Error("test1 is not the same", "minSeq:", minSeq, "targetSeq", 201) + } + + testUID2 := "test_del_id2" + err = SetUserMaxSeq(testUID2, 7000) + userChat = GenUserChat(1, 4999, 5000, 0, testUID2) + userChat2 := GenUserChat(5000, 7000, 6000, 1, testUID2) + err = CreateChat(userChat) + err = CreateChat(userChat2) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID2); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID2) + } + if err := checkMaxSeqWithMongo(operationID, testUID2, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID2) + } + minSeq, err = GetUserMinSeq(testUID2) + if err != nil { + t.Error("err is not nil", testUID2, err.Error()) + } + if minSeq != 6001 { + t.Error("test2 is not the same", "minSeq:", minSeq, "targetSeq", 201) + } + + testUID3 := "test_del_id3" + err = SetUserMaxSeq(testUID3, 4999) + userChat = GenUserChat(1, 4999, 5000, 0, testUID3) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID3); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID3) + } + if err := checkMaxSeqWithMongo(operationID, testUID3, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID3) + } + minSeq, err = GetUserMinSeq(testUID3) + if err != nil { + t.Error("err is not nil", testUID3, err.Error()) + } + if minSeq != 5000 { + t.Error("test2 is not the same", "minSeq:", minSeq, "targetSeq", 201) } } From 3d9355848b4f92430cbabb9dbaa3bcb570eb75e8 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 14:13:16 +0800 Subject: [PATCH 15/32] test cron --- internal/cron_task/clear_msg_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 678bfe1f4..1c730d322 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -122,7 +122,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID2, err.Error()) } if minSeq != 6001 { - t.Error("test2 is not the same", "minSeq:", minSeq, "targetSeq", 201) + t.Error("test2 is not the same", "minSeq:", minSeq, "targetSeq", 6001) } testUID3 := "test_del_id3" @@ -139,6 +139,6 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID3, err.Error()) } if minSeq != 5000 { - t.Error("test2 is not the same", "minSeq:", minSeq, "targetSeq", 201) + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 201) } } From 5888dc3b5dd86d3ad112be4f910ac6f70ebf92a0 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 14:20:17 +0800 Subject: [PATCH 16/32] test cron --- internal/cron_task/clear_msg_test.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 1c730d322..627ba1431 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -128,6 +128,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { testUID3 := "test_del_id3" err = SetUserMaxSeq(testUID3, 4999) userChat = GenUserChat(1, 4999, 5000, 0, testUID3) + err = CreateChat(userChat) if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID3); err != nil { t.Error("checkMaxSeqWithMongo failed", testUID3) } @@ -139,6 +140,28 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID3, err.Error()) } if minSeq != 5000 { - t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 201) + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 5000) + } + + testUID4 := "test_del_id4" + err = SetUserMaxSeq(testUID4, 12000) + userChat = GenUserChat(1, 4999, 5000, 0, testUID4) + userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID4) + userChat3 := GenUserChat(10000, 12000, 11000, 2, testUID4) + err = CreateChat(userChat) + err = CreateChat(userChat2) + err = CreateChat(userChat3) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID4); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID4) + } + if err := checkMaxSeqWithMongo(operationID, testUID4, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID4) + } + minSeq, err = GetUserMinSeq(testUID4) + if err != nil { + t.Error("err is not nil", testUID4, err.Error()) + } + if minSeq != 11001 { + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11001) } } From 7bf7686314eacdcaab7ae3a95f039fec212554de Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 14:54:22 +0800 Subject: [PATCH 17/32] test cron --- internal/cron_task/clear_msg.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 36239c8fb..a2240396c 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -115,6 +115,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs return 0, utils.Wrap(err, "proto.Unmarshal failed") } delStruct.minSeq = lastMsgPb.Seq + 1 + log.NewDebug(operationID, utils.GetSelfFuncName(), msgs.UID, "add to delUidList", "minSeq", lastMsgPb.Seq+1) } else { var hasMarkDelFlag bool for index, msg := range msgs.Msg { From 45293152bb8503cbece188c4374d9ab0b8ec6a24 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 15:20:04 +0800 Subject: [PATCH 18/32] test cron --- internal/cron_task/clear_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index a2240396c..60812b074 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -106,7 +106,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if len(msgs.Msg) > db.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } - if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { + if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) < utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { delStruct.delUidList = append(delStruct.delUidList, msgs.UID) lastMsgPb := &server_api_params.MsgData{} err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) From cdd86349d073a54e9d15e69f1046496c7cb662aa Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 15:27:04 +0800 Subject: [PATCH 19/32] test cron --- internal/cron_task/clear_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 60812b074..5ae3b16c6 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -100,7 +100,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if err != nil { return 0, err } - return delStruct.getSetMinSeq() + 1, nil + return delStruct.getSetMinSeq(), nil } log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) if len(msgs.Msg) > db.GetSingleGocMsgNum() { From 01b2de80ae732c898dbcf61227858526d283bd92 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 15:33:53 +0800 Subject: [PATCH 20/32] test cron --- internal/cron_task/clear_msg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 627ba1431..ec63e1abe 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -162,6 +162,6 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID4, err.Error()) } if minSeq != 11001 { - t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11001) + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11002) } } From e420871e1f3828ddb0ebdcd1484e5079cbef05c7 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 28 Dec 2022 15:38:27 +0800 Subject: [PATCH 21/32] agree user join group --- internal/rpc/group/group.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 4d68582e5..8ac6960e2 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -862,6 +862,10 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G log.NewError(req.OperationID, "GroupApplicationResponse failed ", err.Error(), req.FromUserID) return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } + if imdb.IsExistGroupMember(req.GroupID, req.FromUserID) { + log.NewInfo(req.OperationID, "GroupApplicationResponse user in group", req.GroupID, req.FromUserID) + return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{}}, nil + } member := db.GroupMember{} member.GroupID = req.GroupID member.UserID = req.FromUserID From 71a2090363508b344a1daf4ec6223d40e82be4cb Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 15:41:04 +0800 Subject: [PATCH 22/32] test cron --- internal/cron_task/clear_msg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index ec63e1abe..a68ac7125 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -161,7 +161,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { if err != nil { t.Error("err is not nil", testUID4, err.Error()) } - if minSeq != 11001 { + if minSeq != 11002 { t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11002) } } From 0a5783092510cff952ce13babfc6a7a1e4924ef2 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 15:42:52 +0800 Subject: [PATCH 23/32] test cron --- internal/cron_task/clear_msg_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index a68ac7125..627ba1431 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -161,7 +161,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { if err != nil { t.Error("err is not nil", testUID4, err.Error()) } - if minSeq != 11002 { - t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11002) + if minSeq != 11001 { + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11001) } } From 4618bf990dbe3a84ee0e9d78628471289ce58a9c Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 28 Dec 2022 16:17:54 +0800 Subject: [PATCH 24/32] invite user to group --- internal/rpc/group/group.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 8ac6960e2..9f0d8716b 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -365,6 +365,14 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite var resp pbGroup.InviteUserToGroupResp joinReq := pbGroup.JoinGroupReq{} for _, v := range req.InvitedUserIDList { + if imdb.IsExistGroupMember(req.GroupID, v) { + log.NewError(req.OperationID, "IsExistGroupMember ", req.GroupID, v) + var resultNode pbGroup.Id2Result + resultNode.Result = -1 + resultNode.UserID = v + resp.Id2ResultList = append(resp.Id2ResultList, &resultNode) + continue + } var groupRequest db.GroupRequest groupRequest.UserID = v groupRequest.GroupID = req.GroupID @@ -452,8 +460,19 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite resp.Id2ResultList = append(resp.Id2ResultList, &resultNode) } } else { - okUserIDList = req.InvitedUserIDList - if err := db.DB.AddUserToSuperGroup(req.GroupID, req.InvitedUserIDList); err != nil { + for _, v := range req.InvitedUserIDList { + if imdb.IsExistGroupMember(req.GroupID, v) { + log.NewError(req.OperationID, "IsExistGroupMember ", req.GroupID, v) + var resultNode pbGroup.Id2Result + resultNode.Result = -1 + resp.Id2ResultList = append(resp.Id2ResultList, &resultNode) + continue + } else { + okUserIDList = append(okUserIDList, v) + } + } + //okUserIDList = req.InvitedUserIDList + if err := db.DB.AddUserToSuperGroup(req.GroupID, okUserIDList); err != nil { log.NewError(req.OperationID, "AddUserToSuperGroup failed ", req.GroupID, err) return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil } From 235be5da0cf900532011e06e46491fb0ed740878 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 17:26:53 +0800 Subject: [PATCH 25/32] cron --- script/check_all.sh | 20 ++++++++++---------- script/start_all.sh | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/script/check_all.sh b/script/check_all.sh index 76535c79f..af3d44ceb 100644 --- a/script/check_all.sh +++ b/script/check_all.sh @@ -54,13 +54,13 @@ else fi -#check=$(ps aux | grep -w ./${cron_task_name} | grep -v grep | wc -l) -#if [ $check -ge 1 ]; then -# echo -e ${GREEN_PREFIX}"none port has been listening,belongs service is openImCronTask"${COLOR_SUFFIX} -#else -# echo -e ${RED_PREFIX}"cron_task_name service does not start normally"${COLOR_SUFFIX} -# echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} -# exit -1 -#fi -# -#echo -e ${YELLOW_PREFIX}"all services launch success"${COLOR_SUFFIX} +check=$(ps aux | grep -w ./${cron_task_name} | grep -v grep | wc -l) +if [ $check -ge 1 ]; then + echo -e ${GREEN_PREFIX}"none port has been listening,belongs service is openImCronTask"${COLOR_SUFFIX} +else + echo -e ${RED_PREFIX}"cron_task_name service does not start normally"${COLOR_SUFFIX} + echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} + exit -1 +fi + +echo -e ${YELLOW_PREFIX}"all services launch success"${COLOR_SUFFIX} diff --git a/script/start_all.sh b/script/start_all.sh index 223187973..4a5f7d65e 100644 --- a/script/start_all.sh +++ b/script/start_all.sh @@ -10,7 +10,7 @@ need_to_start_server_shell=( sdk_svr_start.sh msg_gateway_start.sh demo_svr_start.sh -# start_cron.sh + start_cron.sh ) time=`date +"%Y-%m-%d %H:%M:%S"` echo "==========================================================">>../logs/openIM.log 2>&1 & From ee9c3cbde3807e678fe11eb7ee3f56410ff0a839 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 17:49:41 +0800 Subject: [PATCH 26/32] cron --- internal/cron_task/clear_msg_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 627ba1431..539547f41 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -164,4 +164,24 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { if minSeq != 11001 { t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11001) } + + testUID5 := "test_del_id5" + err = SetUserMaxSeq(testUID4, 9999) + userChat = GenUserChat(1, 4999, 5000, 0, testUID5) + userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID5) + err = CreateChat(userChat) + err = CreateChat(userChat2) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID5); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID4) + } + if err := checkMaxSeqWithMongo(operationID, testUID5, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID5) + } + minSeq, err = GetUserMinSeq(testUID5) + if err != nil { + t.Error("err is not nil", testUID5, err.Error()) + } + if minSeq != 10000 { + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 10000) + } } From 1e2a60c802347ef228bfb8eaba25db659a949568 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 28 Dec 2022 17:56:13 +0800 Subject: [PATCH 27/32] join group --- internal/rpc/group/group.go | 5 ++++- .../db/mysql_model/im_mysql_model/group_request_model.go | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 9f0d8716b..bc623c0d8 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1000,12 +1000,15 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) (*pbGroup.JoinGroupResp, error) { log.NewInfo(req.OperationID, "JoinGroup args ", req.String()) + if imdb.IsExistGroupMember(req.GroupID, req.OpUserID) { + log.NewInfo(req.OperationID, "IsExistGroupMember", req.GroupID, req.OpUserID) + return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{}}, nil + } _, err := imdb.GetUserByUserID(req.OpUserID) if err != nil { log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), req.OpUserID) return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) if err != nil { log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.GroupID, err) diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go index a9dd6d98e..f6f5b2637 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go @@ -27,7 +27,9 @@ func UpdateGroupRequest(groupRequest db.GroupRequest) error { } func InsertIntoGroupRequest(toInsertInfo db.GroupRequest) error { - DelGroupRequestByGroupIDAndUserID(toInsertInfo.GroupID, toInsertInfo.UserID) + if err := DelGroupRequestByGroupIDAndUserID(toInsertInfo.GroupID, toInsertInfo.UserID); err != nil { + return err + } if toInsertInfo.HandledTime.Unix() < 0 { toInsertInfo.HandledTime = utils.UnixSecondToTime(0) } @@ -70,7 +72,7 @@ func GetGroupRequestByGroupID(groupID string) ([]db.GroupRequest, error) { return groupRequestList, nil } -//received +// received func GetGroupApplicationList(userID string) ([]db.GroupRequest, error) { var groupRequestList []db.GroupRequest memberList, err := GetGroupMemberListByUserID(userID) From d1f3266383ae261ef9f929863539d7999ce8ddab Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 18:00:09 +0800 Subject: [PATCH 28/32] cron --- internal/cron_task/clear_msg_test.go | 53 ++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 539547f41..465216ce9 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/protobuf/proto" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "gopkg.in/mgo.v2/bson" "testing" "time" @@ -71,6 +72,11 @@ func CreateChat(userChat *db.UserChat) error { return err } +func DelChat(uid string, index int) error { + _, err := mongoClient.DeleteOne(context.Background(), bson.M{"uid": uid + ":" + strconv.Itoa(index)}) + return err +} + func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { operationID := getCronTaskOperationID() redisClient = redis.NewClient(&redis.Options{ @@ -84,10 +90,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri)) mongoClient = client.Database("openIM").Collection("msg") testUID1 := "test_del_id1" - - //testUID4 := "test_del_id4" - //testUID5 := "test_del_id5" - //testUID6 := "test_del_id6" + err = DelChat(testUID1, 0) err = SetUserMaxSeq(testUID1, 600) userChat := GenUserChat(1, 600, 200, 0, testUID1) err = CreateChat(userChat) @@ -106,11 +109,14 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { } testUID2 := "test_del_id2" + err = DelChat(testUID2, 0) + err = DelChat(testUID2, 1) err = SetUserMaxSeq(testUID2, 7000) userChat = GenUserChat(1, 4999, 5000, 0, testUID2) userChat2 := GenUserChat(5000, 7000, 6000, 1, testUID2) err = CreateChat(userChat) err = CreateChat(userChat2) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID2); err != nil { t.Error("checkMaxSeqWithMongo failed", testUID2) } @@ -126,6 +132,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { } testUID3 := "test_del_id3" + err = DelChat(testUID3, 0) err = SetUserMaxSeq(testUID3, 4999) userChat = GenUserChat(1, 4999, 5000, 0, testUID3) err = CreateChat(userChat) @@ -144,6 +151,9 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { } testUID4 := "test_del_id4" + err = DelChat(testUID4, 0) + err = DelChat(testUID4, 1) + err = DelChat(testUID4, 2) err = SetUserMaxSeq(testUID4, 12000) userChat = GenUserChat(1, 4999, 5000, 0, testUID4) userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID4) @@ -162,11 +172,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID4, err.Error()) } if minSeq != 11001 { - t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 11001) + t.Error("test4 is not the same", "minSeq:", minSeq, "targetSeq", 11001) } testUID5 := "test_del_id5" - err = SetUserMaxSeq(testUID4, 9999) + err = DelChat(testUID5, 0) + err = DelChat(testUID5, 1) + err = SetUserMaxSeq(testUID5, 9999) userChat = GenUserChat(1, 4999, 5000, 0, testUID5) userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID5) err = CreateChat(userChat) @@ -182,6 +194,33 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("err is not nil", testUID5, err.Error()) } if minSeq != 10000 { - t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 10000) + t.Error("test5 is not the same", "minSeq:", minSeq, "targetSeq", 10000) + } + + testUID6 := "test_del_id6" + err = DelChat(testUID5, 0) + err = DelChat(testUID5, 1) + err = DelChat(testUID5, 2) + err = DelChat(testUID5, 3) + userChat = GenUserChat(1, 4999, 5000, 0, testUID6) + userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID6) + userChat3 = GenUserChat(10000, 14999, 13000, 2, testUID6) + userChat4 := GenUserChat(15000, 19999, 0, 3, testUID6) + err = CreateChat(userChat) + err = CreateChat(userChat2) + err = CreateChat(userChat3) + err = CreateChat(userChat4) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID6); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID6) + } + if err := checkMaxSeqWithMongo(operationID, testUID6, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID6) + } + minSeq, err = GetUserMinSeq(testUID6) + if err != nil { + t.Error("err is not nil", testUID6, err.Error()) + } + if minSeq != 13001 { + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 13001) } } From 9dac12cf2bcdf8d35e32853d69e2ff0dfb6799ef Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 18:56:49 +0800 Subject: [PATCH 29/32] cron --- config/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.yaml b/config/config.yaml index 61aadf2b3..7980eaa6f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -36,7 +36,7 @@ mongo: dbPassword: #mongo密码,建议先不设置 dbMaxPoolSize: 100 dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 - chatRecordsClearTime: "* * * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * + chatRecordsClearTime: "0 3 * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * redis: dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 From 2aa49f080e61e48e1c398bf455fe6f367dec1b14 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Dec 2022 19:19:26 +0800 Subject: [PATCH 30/32] cron --- internal/cron_task/clear_msg.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 5ae3b16c6..08ef74cef 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -119,6 +119,9 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs } else { var hasMarkDelFlag bool for index, msg := range msgs.Msg { + if msg.SendTime == 0 { + continue + } msgPb := &server_api_params.MsgData{} err = proto.Unmarshal(msg.Msg, msgPb) if err != nil { From 972dd5f5312b5136bf6f653e0947a98690cb556b Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 29 Dec 2022 11:00:41 +0800 Subject: [PATCH 31/32] fix bug --- internal/push/logic/push_to_client.go | 6 ++++++ internal/rpc/msg/send_msg.go | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 9b3475248..7872a2d4d 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -79,6 +79,9 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { return } } + if pushMsg.MsgData.ContentType > constant.NotificationBegin && pushMsg.MsgData.ContentType < constant.NotificationEnd && pushMsg.MsgData.ContentType != constant.SignalingNotification { + return + } if pushMsg.MsgData.ContentType == constant.SignalingNotification { isSend, err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData, pushMsg.PushToUserID) if err != nil { @@ -198,6 +201,9 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) successCount++ if isOfflinePush { + if pushMsg.MsgData.ContentType > constant.NotificationBegin && pushMsg.MsgData.ContentType < constant.NotificationEnd && pushMsg.MsgData.ContentType != constant.SignalingNotification { + return + } var onlineSuccessUserIDList []string onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID) for _, v := range wsResult { diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index b3a3cb745..39380b003 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -25,6 +25,7 @@ import ( "time" promePkg "Open_IM/pkg/common/prometheus" + go_redis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" ) @@ -125,7 +126,7 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { return true, 0, "", nil } - if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { + if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin && data.MsgData.ContentType != constant.SignalingNotification { return true, 0, "", nil } log.NewDebug(data.OperationID, *config.Config.MessageVerify.FriendVerify) From 78d7588af7ea8262e785c6d91456253636dc4e83 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 29 Dec 2022 19:10:43 +0800 Subject: [PATCH 32/32] kfk log --- pkg/common/kafka/consumer_group.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 97a0f5d6e..57017136c 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -8,6 +8,8 @@ package kafka import ( "context" + "fmt" + "github.com/Shopify/sarama" ) @@ -31,6 +33,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str //fmt.Println("init address is ", addrs, "topics is ", topics) consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if err != nil { + fmt.Println("args:", addrs, groupID, config) panic(err.Error()) } return &MConsumerGroup{