diff --git a/config/config.yaml b/config/config.yaml index 61aadf2b3..7980eaa6f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -36,7 +36,7 @@ mongo: dbPassword: #mongo密码,建议先不设置 dbMaxPoolSize: 100 dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 - chatRecordsClearTime: "* * * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * + chatRecordsClearTime: "0 3 * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * redis: dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 6e7db12d1..08ef74cef 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -106,57 +106,45 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if len(msgs.Msg) > db.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } - // lastMsgSendTime := msgs.Msg[len(msgs.Msg)-1].SendTime - - var hasMsgDoNotNeedDel bool - for i, msg := range msgs.Msg { - // 找到列表中不需要删除的消息了, 表示为递归到最后一个块 - if utils.GetCurrentTimestampByMill() < msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { - log.NewDebug(operationID, ID, "find uid", msgs.UID) - // 删除块失败 递归结束 返回0 - hasMsgDoNotNeedDel = true - if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { - return 0, err - } - // unMarshall失败 块删除成功 设置为最小seq - msgPb := &server_api_params.MsgData{} - if err = proto.Unmarshal(msg.Msg, msgPb); err != nil { - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - // 如果不是块中第一个,就把前面比他早插入的全部设置空 seq字段除外。 - if i > 0 { - delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - } - // 递归结束 - return msgPb.Seq, nil - } - } - // 该列表中消息全部为老消息并且列表满了, 加入删除列表继续递归 - // lastMsgPb := &server_api_params.MsgData{} - // err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) - // if err != nil { - // log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) - // return 0, utils.Wrap(err, "proto.Unmarshal failed") - // } - // delStruct.minSeq = lastMsgPb.Seq - if msgListIsFull(msgs) { - log.NewDebug(operationID, "msg list is full", msgs.UID) + if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) < utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) { delStruct.delUidList = append(delStruct.delUidList, msgs.UID) + lastMsgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + delStruct.minSeq = lastMsgPb.Seq + 1 + log.NewDebug(operationID, utils.GetSelfFuncName(), msgs.UID, "add to delUidList", "minSeq", lastMsgPb.Seq+1) } else { - // 列表没有满且没有不需要被删除的消息 代表他是最新的消息块 - if !hasMsgDoNotNeedDel { - delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, len(msgs.Msg)-1) + var hasMarkDelFlag bool + for index, msg := range msgs.Msg { + if msg.SendTime == 0 { + continue + } + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msg.Msg, msgPb) if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, "Index:", len(msgs.Msg)-1) - err = delMongoMsgsPhysical(delStruct.delUidList) - if err != nil { - return delStruct.getSetMinSeq(), err + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + if utils.GetCurrentTimestampByMill() > msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { + msgPb.Status = constant.MsgDeleted + bytes, _ := proto.Marshal(msgPb) + msgs.Msg[index].Msg = bytes + msgs.Msg[index].SendTime = 0 + hasMarkDelFlag = true + } else { + if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { + return 0, err } - return delStruct.getSetMinSeq(), nil + if hasMarkDelFlag { + log.NewInfo(operationID, ID, "hasMarkDelFlag", "index:", index, "msgPb:", msgPb, msgs.UID) + if err := db.DB.UpdateOneMsgList(msgs); err != nil { + return delStruct.getSetMinSeq(), utils.Wrap(err, "") + } + } + return msgPb.Seq, nil } } } @@ -179,14 +167,6 @@ func msgListIsFull(chat *db.UserChat) bool { return false } -func CheckGroupUserMinSeq(operationID, groupID, userID string) error { - return nil -} - -func CheckUserMinSeqWithMongo(operationID, userID string) error { - return nil -} - func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { var seqRedis uint64 var err error diff --git a/internal/cron_task/clear_msg_test.go b/internal/cron_task/clear_msg_test.go index 0a2ba7071..465216ce9 100644 --- a/internal/cron_task/clear_msg_test.go +++ b/internal/cron_task/clear_msg_test.go @@ -3,100 +3,224 @@ package cronTask import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" - pbMsg "Open_IM/pkg/proto/msg" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" - "os/exec" + "context" + "fmt" + "strconv" + + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "gopkg.in/mgo.v2/bson" + "testing" "time" ) -func getMsgListFake(num int) []*pbMsg.MsgDataToMQ { - var msgList []*pbMsg.MsgDataToMQ - for i := 1; i < num; i++ { - msgList = append(msgList, &pbMsg.MsgDataToMQ{ - Token: "tk", - OperationID: "operationID", - MsgData: &server_api_params.MsgData{ - SendID: "sendID1", - RecvID: "recvID1", - GroupID: "", - ClientMsgID: "xxx", - ServerMsgID: "xxx", - SenderPlatformID: 1, - SenderNickname: "testNickName", - SenderFaceURL: "testFaceURL", - SessionType: 1, - MsgFrom: 100, - ContentType: 101, - Content: []byte("testFaceURL"), - Seq: uint32(i), - SendTime: time.Now().Unix(), - CreateTime: time.Now().Unix(), - Status: 1, - }, - }) +var ( + redisClient *redis.Client + mongoClient *mongo.Collection +) + +func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat { + chat := &db.UserChat{UID: userID + ":" + strconv.Itoa(int(index))} + for i := startSeq; i <= stopSeq; i++ { + msg := server_api_params.MsgData{ + SendID: "sendID1", + RecvID: "recvID1", + GroupID: "", + ClientMsgID: "xxx", + ServerMsgID: "xxx", + SenderPlatformID: 1, + SenderNickname: "testNickName", + SenderFaceURL: "testFaceURL", + SessionType: 1, + MsgFrom: 100, + ContentType: 101, + Content: []byte("testFaceURL"), + Seq: uint32(i), + SendTime: time.Now().Unix(), + CreateTime: time.Now().Unix(), + Status: 1, + } + bytes, _ := proto.Marshal(&msg) + var sendTime int64 + if i <= delSeq { + sendTime = 10000 + } else { + sendTime = utils.GetCurrentTimestampByMill() + } + chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) } - return msgList + return chat +} + +func SetUserMaxSeq(userID string, seq int) error { + return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err() +} + +func GetUserMinSeq(userID string) (uint64, error) { + key := "REDIS_USER_MIN_SEQ:" + userID + seq, err := redisClient.Get(context.Background(), key).Result() + return uint64(utils.StringToInt(seq)), err +} + +func CreateChat(userChat *db.UserChat) error { + _, err := mongoClient.InsertOne(context.Background(), userChat) + return err +} + +func DelChat(uid string, index int) error { + _, err := mongoClient.DeleteOne(context.Background(), bson.M{"uid": uid + ":" + strconv.Itoa(index)}) + return err } func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { - operationID := getCronTaskOperationID() + redisClient = redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:16379", + Password: "openIM123", // no password set + DB: 0, // use default DB + }) + mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", + "root", "openIM123", "127.0.0.1:37017", + "openIM", 100) + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri)) + mongoClient = client.Database("openIM").Collection("msg") testUID1 := "test_del_id1" - //testUID2 := "test_del_id2" - //testUID3 := "test_del_id3" - //testUID4 := "test_del_id4" - //testUID5 := "test_del_id5" - //testUID6 := "test_del_id6" - testUserIDList := []string{testUID1} - - err := db.DB.SetUserMaxSeq(testUID1, 500) - err = db.DB.BatchInsertChat2DB(testUID1, getMsgListFake(500), testUID1+"-"+operationID, 500) + err = DelChat(testUID1, 0) + err = SetUserMaxSeq(testUID1, 600) + userChat := GenUserChat(1, 600, 200, 0, testUID1) + err = CreateChat(userChat) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID1); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID1) + } + if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID1) + } + minSeq, err := GetUserMinSeq(testUID1) if err != nil { - t.Error(err.Error(), testUID1) + t.Error("err is not nil", testUID1, err.Error()) } - //db.DB.SetUserMaxSeq(testUID1, 6000) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 4999) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 30000) - //db.DB.BatchInsertChat2DB() - // - //db.DB.SetUserMaxSeq(testUID1, 9999) - //db.DB.BatchInsertChat2DB() - cmd := exec.Command("/bin/bash", "unset $CONFIG_NAME") - _, err = cmd.StdoutPipe() + if minSeq != 201 { + t.Error("test1 is not the same", "minSeq:", minSeq, "targetSeq", 201) + } + + testUID2 := "test_del_id2" + err = DelChat(testUID2, 0) + err = DelChat(testUID2, 1) + err = SetUserMaxSeq(testUID2, 7000) + userChat = GenUserChat(1, 4999, 5000, 0, testUID2) + userChat2 := GenUserChat(5000, 7000, 6000, 1, testUID2) + err = CreateChat(userChat) + err = CreateChat(userChat2) + + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID2); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID2) + } + if err := checkMaxSeqWithMongo(operationID, testUID2, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID2) + } + minSeq, err = GetUserMinSeq(testUID2) if err != nil { - return + t.Error("err is not nil", testUID2, err.Error()) + } + if minSeq != 6001 { + t.Error("test2 is not the same", "minSeq:", minSeq, "targetSeq", 6001) } - //执行命令 - if err := cmd.Start(); err != nil { - return + testUID3 := "test_del_id3" + err = DelChat(testUID3, 0) + err = SetUserMaxSeq(testUID3, 4999) + userChat = GenUserChat(1, 4999, 5000, 0, testUID3) + err = CreateChat(userChat) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID3); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID3) } - for _, userID := range testUserIDList { - operationID = userID + "-" + operationID - if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { - t.Error("checkMaxSeqWithMongo failed", userID) - } - if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", userID) - } + if err := checkMaxSeqWithMongo(operationID, testUID3, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID3) + } + minSeq, err = GetUserMinSeq(testUID3) + if err != nil { + t.Error("err is not nil", testUID3, err.Error()) + } + if minSeq != 5000 { + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 5000) } - testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"} - for _, groupID := range testWorkingGroupIDList { - operationID = groupID + "-" + operationID - log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList) - if err := ResetUserGroupMinSeq(operationID, groupID, testUserIDList); err != nil { - t.Error("checkMaxSeqWithMongo failed", groupID) - } - if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", groupID) - } + testUID4 := "test_del_id4" + err = DelChat(testUID4, 0) + err = DelChat(testUID4, 1) + err = DelChat(testUID4, 2) + err = SetUserMaxSeq(testUID4, 12000) + userChat = GenUserChat(1, 4999, 5000, 0, testUID4) + userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID4) + userChat3 := GenUserChat(10000, 12000, 11000, 2, testUID4) + err = CreateChat(userChat) + err = CreateChat(userChat2) + err = CreateChat(userChat3) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID4); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID4) + } + if err := checkMaxSeqWithMongo(operationID, testUID4, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID4) + } + minSeq, err = GetUserMinSeq(testUID4) + if err != nil { + t.Error("err is not nil", testUID4, err.Error()) + } + if minSeq != 11001 { + t.Error("test4 is not the same", "minSeq:", minSeq, "targetSeq", 11001) + } + + testUID5 := "test_del_id5" + err = DelChat(testUID5, 0) + err = DelChat(testUID5, 1) + err = SetUserMaxSeq(testUID5, 9999) + userChat = GenUserChat(1, 4999, 5000, 0, testUID5) + userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID5) + err = CreateChat(userChat) + err = CreateChat(userChat2) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID5); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID4) + } + if err := checkMaxSeqWithMongo(operationID, testUID5, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID5) + } + minSeq, err = GetUserMinSeq(testUID5) + if err != nil { + t.Error("err is not nil", testUID5, err.Error()) + } + if minSeq != 10000 { + t.Error("test5 is not the same", "minSeq:", minSeq, "targetSeq", 10000) + } + + testUID6 := "test_del_id6" + err = DelChat(testUID5, 0) + err = DelChat(testUID5, 1) + err = DelChat(testUID5, 2) + err = DelChat(testUID5, 3) + userChat = GenUserChat(1, 4999, 5000, 0, testUID6) + userChat2 = GenUserChat(5000, 9999, 10000, 1, testUID6) + userChat3 = GenUserChat(10000, 14999, 13000, 2, testUID6) + userChat4 := GenUserChat(15000, 19999, 0, 3, testUID6) + err = CreateChat(userChat) + err = CreateChat(userChat2) + err = CreateChat(userChat3) + err = CreateChat(userChat4) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, testUID6); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID6) + } + if err := checkMaxSeqWithMongo(operationID, testUID6, constant.WriteDiffusion); err != nil { + t.Error("checkMaxSeqWithMongo failed", testUID6) + } + minSeq, err = GetUserMinSeq(testUID6) + if err != nil { + t.Error("err is not nil", testUID6, err.Error()) + } + if minSeq != 13001 { + t.Error("test3 is not the same", "minSeq:", minSeq, "targetSeq", 13001) } } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 9e4bcbfd4..484def4f5 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -8,8 +8,9 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "fmt" - "github.com/robfig/cron/v3" "time" + + "github.com/robfig/cron/v3" ) const cronTaskOperationID = "cronTaskOperationID-" @@ -57,6 +58,7 @@ func ClearAll() { } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } + // working group msg clear workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) if err == nil { @@ -77,9 +79,6 @@ func StartClearMsg(operationID string, userIDList []string) { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } - if err := CheckUserMinSeqWithMongo(operationID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), userID, err) - } } } @@ -98,10 +97,5 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } - for _, userID := range userIDList { - if err := CheckGroupUserMinSeq(operationID, groupID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) - } - } } } diff --git a/internal/cron_task/test/main.go b/internal/cron_task/test/main.go deleted file mode 100644 index 5a2b3f44a..000000000 --- a/internal/cron_task/test/main.go +++ /dev/null @@ -1,6 +0,0 @@ -package main - -// -//func main() { -// db.DB.BatchInsertChat() -//} diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 9b3475248..7872a2d4d 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -79,6 +79,9 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { return } } + if pushMsg.MsgData.ContentType > constant.NotificationBegin && pushMsg.MsgData.ContentType < constant.NotificationEnd && pushMsg.MsgData.ContentType != constant.SignalingNotification { + return + } if pushMsg.MsgData.ContentType == constant.SignalingNotification { isSend, err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData, pushMsg.PushToUserID) if err != nil { @@ -198,6 +201,9 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) successCount++ if isOfflinePush { + if pushMsg.MsgData.ContentType > constant.NotificationBegin && pushMsg.MsgData.ContentType < constant.NotificationEnd && pushMsg.MsgData.ContentType != constant.SignalingNotification { + return + } var onlineSuccessUserIDList []string onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID) for _, v := range wsResult { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 4d68582e5..bc623c0d8 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -365,6 +365,14 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite var resp pbGroup.InviteUserToGroupResp joinReq := pbGroup.JoinGroupReq{} for _, v := range req.InvitedUserIDList { + if imdb.IsExistGroupMember(req.GroupID, v) { + log.NewError(req.OperationID, "IsExistGroupMember ", req.GroupID, v) + var resultNode pbGroup.Id2Result + resultNode.Result = -1 + resultNode.UserID = v + resp.Id2ResultList = append(resp.Id2ResultList, &resultNode) + continue + } var groupRequest db.GroupRequest groupRequest.UserID = v groupRequest.GroupID = req.GroupID @@ -452,8 +460,19 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite resp.Id2ResultList = append(resp.Id2ResultList, &resultNode) } } else { - okUserIDList = req.InvitedUserIDList - if err := db.DB.AddUserToSuperGroup(req.GroupID, req.InvitedUserIDList); err != nil { + for _, v := range req.InvitedUserIDList { + if imdb.IsExistGroupMember(req.GroupID, v) { + log.NewError(req.OperationID, "IsExistGroupMember ", req.GroupID, v) + var resultNode pbGroup.Id2Result + resultNode.Result = -1 + resp.Id2ResultList = append(resp.Id2ResultList, &resultNode) + continue + } else { + okUserIDList = append(okUserIDList, v) + } + } + //okUserIDList = req.InvitedUserIDList + if err := db.DB.AddUserToSuperGroup(req.GroupID, okUserIDList); err != nil { log.NewError(req.OperationID, "AddUserToSuperGroup failed ", req.GroupID, err) return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil } @@ -862,6 +881,10 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G log.NewError(req.OperationID, "GroupApplicationResponse failed ", err.Error(), req.FromUserID) return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } + if imdb.IsExistGroupMember(req.GroupID, req.FromUserID) { + log.NewInfo(req.OperationID, "GroupApplicationResponse user in group", req.GroupID, req.FromUserID) + return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{}}, nil + } member := db.GroupMember{} member.GroupID = req.GroupID member.UserID = req.FromUserID @@ -977,12 +1000,15 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) (*pbGroup.JoinGroupResp, error) { log.NewInfo(req.OperationID, "JoinGroup args ", req.String()) + if imdb.IsExistGroupMember(req.GroupID, req.OpUserID) { + log.NewInfo(req.OperationID, "IsExistGroupMember", req.GroupID, req.OpUserID) + return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{}}, nil + } _, err := imdb.GetUserByUserID(req.OpUserID) if err != nil { log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), req.OpUserID) return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) if err != nil { log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.GroupID, err) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index b3a3cb745..39380b003 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -25,6 +25,7 @@ import ( "time" promePkg "Open_IM/pkg/common/prometheus" + go_redis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" ) @@ -125,7 +126,7 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { return true, 0, "", nil } - if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { + if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin && data.MsgData.ContentType != constant.SignalingNotification { return true, 0, "", nil } log.NewDebug(data.OperationID, *config.Config.MessageVerify.FriendVerify) diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 8ae026522..4d7d01f80 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -10,13 +10,14 @@ import ( "context" "errors" "fmt" + "math/rand" + "sync" + "github.com/go-redis/redis/v8" "github.com/gogo/protobuf/sortkeys" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "math/rand" - "sync" //"github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" @@ -206,6 +207,13 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat return nil } +func (d *DataBases) UpdateOneMsgList(msg *UserChat) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + _, err := c.UpdateOne(ctx, bson.M{"uid": msg.UID}, bson.M{"$set": bson.M{"msg": msg.Msg}}) + return err +} + func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList) var hasSeqList []uint32 diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go index a9dd6d98e..f6f5b2637 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go @@ -27,7 +27,9 @@ func UpdateGroupRequest(groupRequest db.GroupRequest) error { } func InsertIntoGroupRequest(toInsertInfo db.GroupRequest) error { - DelGroupRequestByGroupIDAndUserID(toInsertInfo.GroupID, toInsertInfo.UserID) + if err := DelGroupRequestByGroupIDAndUserID(toInsertInfo.GroupID, toInsertInfo.UserID); err != nil { + return err + } if toInsertInfo.HandledTime.Unix() < 0 { toInsertInfo.HandledTime = utils.UnixSecondToTime(0) } @@ -70,7 +72,7 @@ func GetGroupRequestByGroupID(groupID string) ([]db.GroupRequest, error) { return groupRequestList, nil } -//received +// received func GetGroupApplicationList(userID string) ([]db.GroupRequest, error) { var groupRequestList []db.GroupRequest memberList, err := GetGroupMemberListByUserID(userID) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 97a0f5d6e..57017136c 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -8,6 +8,8 @@ package kafka import ( "context" + "fmt" + "github.com/Shopify/sarama" ) @@ -31,6 +33,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str //fmt.Println("init address is ", addrs, "topics is ", topics) consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if err != nil { + fmt.Println("args:", addrs, groupID, config) panic(err.Error()) } return &MConsumerGroup{ diff --git a/script/check_all.sh b/script/check_all.sh index 76535c79f..af3d44ceb 100644 --- a/script/check_all.sh +++ b/script/check_all.sh @@ -54,13 +54,13 @@ else fi -#check=$(ps aux | grep -w ./${cron_task_name} | grep -v grep | wc -l) -#if [ $check -ge 1 ]; then -# echo -e ${GREEN_PREFIX}"none port has been listening,belongs service is openImCronTask"${COLOR_SUFFIX} -#else -# echo -e ${RED_PREFIX}"cron_task_name service does not start normally"${COLOR_SUFFIX} -# echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} -# exit -1 -#fi -# -#echo -e ${YELLOW_PREFIX}"all services launch success"${COLOR_SUFFIX} +check=$(ps aux | grep -w ./${cron_task_name} | grep -v grep | wc -l) +if [ $check -ge 1 ]; then + echo -e ${GREEN_PREFIX}"none port has been listening,belongs service is openImCronTask"${COLOR_SUFFIX} +else + echo -e ${RED_PREFIX}"cron_task_name service does not start normally"${COLOR_SUFFIX} + echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} + exit -1 +fi + +echo -e ${YELLOW_PREFIX}"all services launch success"${COLOR_SUFFIX} diff --git a/script/start_all.sh b/script/start_all.sh index 223187973..4a5f7d65e 100644 --- a/script/start_all.sh +++ b/script/start_all.sh @@ -10,7 +10,7 @@ need_to_start_server_shell=( sdk_svr_start.sh msg_gateway_start.sh demo_svr_start.sh -# start_cron.sh + start_cron.sh ) time=`date +"%Y-%m-%d %H:%M:%S"` echo "==========================================================">>../logs/openIM.log 2>&1 &