diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index f86e4adac..481b3b104 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -164,6 +164,21 @@ func msgListIsFull(chat *db.UserChat) bool { return false } +func CheckGroupUserMinSeq(operationID, groupID, userID string, diffusionType int) error { + return nil +} + +func CheckUserMinSeqWithMongo(operationID, userID string, diffusionType int) error { + //var seqRedis uint64 + //var err error + //if diffusionType == constant.WriteDiffusion { + // seqRedis, err = db.DB.GetUserMinSeq(ID) + //} else { + // seqRedis, err = db.DB.GetGroupUserMinSeq(ID) + //} + return nil +} + func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { var seqRedis uint64 var err error @@ -185,17 +200,10 @@ func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { if msg == nil { return nil } - var seqMongo uint32 - msgPb := &server_api_params.MsgData{} - err = proto.Unmarshal(msg.Msg, msgPb) - if err != nil { - return utils.Wrap(err, "") - } - seqMongo = msgPb.Seq - if math.Abs(float64(seqMongo-uint32(seqRedis))) > 10 { - log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msgPb.Status, msg.SendTime) + if math.Abs(float64(msg.Seq-uint32(seqRedis))) > 10 { + log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msg.Status, msg.SendTime) } else { - log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "seq and msg OK", "status:", msgPb.Status, msg.SendTime) + log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "seq and msg OK", "status:", msg.Status, msg.SendTime) } return nil } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 1fdef997e..f08cc3db4 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -30,11 +30,8 @@ func StartCronTask(userID, workingGroupID string) { fmt.Println("clear msg finished") return } - clearFunc := func() { - ClearAll() - } c := cron.New() - _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearFunc) + _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll) if err != nil { fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) panic(err) @@ -53,7 +50,6 @@ func getCronTaskOperationID() string { func ClearAll() { operationID := getCronTaskOperationID() log.NewInfo(operationID, "====================== start del cron task ======================") - //var userIDList []string var err error userIDList, err := im_mysql_model.SelectAllUserID() if err == nil { @@ -61,7 +57,6 @@ func ClearAll() { } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } - //return // working group msg clear workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) if err == nil { @@ -82,6 +77,9 @@ func StartClearMsg(operationID string, userIDList []string) { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } + if err := CheckUserMinSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), userID, err) + } } } @@ -100,5 +98,10 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } + for _, userID := range userIDList { + if err := CheckGroupUserMinSeq(operationID, groupID, userID, constant.ReadDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) + } + } } } diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index aecf82238..ce5bf1c7c 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -15,10 +15,12 @@ import ( "bytes" "context" "encoding/gob" - "github.com/golang/protobuf/proto" - "github.com/gorilla/websocket" "runtime" "strings" + + "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" + "google.golang.org/grpc" ) func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { @@ -150,7 +152,8 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { return } msgClient := pbChat.NewMsgClient(grpcConn) - reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq) + maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20) + reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq, maxSizeOption) if err != nil { log.NewError(rpcReq.OperationID, "pullMsgBySeqListReq err", err.Error()) nReply.ErrCode = 200 @@ -403,7 +406,7 @@ func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) { if isPass { req := pData.(*sdk_ws.SetAppBackgroundStatusReq) conn.IsBackground = req.IsBackground - callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.platformID), conn.token, req.IsBackground) + callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.platformID), conn.token, conn.IsBackground) if callbackResp.ErrCode != 0 { log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) } diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index cff59c1a6..a096e4eb4 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -70,7 +70,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { wsResult = append(wsResult, reply.SinglePushResult...) } } - log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) + log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData, "isOfflinePush", isOfflinePush) successCount++ if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { // save invitation info for offline push diff --git a/internal/rpc/group/super_group.go b/internal/rpc/group/super_group.go index b667abe76..070c95256 100644 --- a/internal/rpc/group/super_group.go +++ b/internal/rpc/group/super_group.go @@ -8,6 +8,7 @@ import ( commonPb "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" + "github.com/go-redis/redis/v8" ) diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go new file mode 100644 index 000000000..6cb5870b7 --- /dev/null +++ b/internal/rpc/msg/extend_msg.go @@ -0,0 +1 @@ +package msg diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 415d6fff1..f1329d47a 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -10,11 +10,12 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/proto/msg" "Open_IM/pkg/utils" - "github.com/golang/protobuf/proto" "net" "strconv" "strings" + "github.com/golang/protobuf/proto" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" @@ -94,8 +95,12 @@ func (rpc *rpcChat) Run() { panic("listening err:" + err.Error() + rpc.rpcRegisterName) } log.Info("", "listen network success, address ", address) - - var grpcOpts []grpc.ServerOption + recvSize := 1024 * 1024 * 30 + sendSize := 1024 * 1024 * 30 + var grpcOpts = []grpc.ServerOption{ + grpc.MaxRecvMsgSize(recvSize), + grpc.MaxSendMsgSize(sendSize), + } if config.Config.Prometheus.Enable { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index c072b1b71..e712afcfd 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -107,6 +107,17 @@ func userIsMuteInGroup(groupID, userID string) (bool, error) { return false, nil } +func groupIsMuted(groupID string) (bool, error) { + groupInfo, err := rocksCache.GetGroupInfoFromCache(groupID) + if err != nil { + return false, utils.Wrap(err, "GetGroupInfoFromCache failed") + } + if groupInfo.Status == constant.GroupStatusMuted { + return true, nil + } + return false, nil +} + func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string) { switch data.MsgData.SessionType { case constant.SingleChatType: @@ -182,7 +193,15 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s return false, 202, "you are not in group", nil } } - isMute, err := userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID) + isMute, err := groupIsMuted(data.MsgData.GroupID) + if err != nil { + errMsg := data.OperationID + err.Error() + return false, 223, errMsg, nil + } + if isMute { + return false, 225, "group id muted", nil + } + isMute, err = userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID) if err != nil { errMsg := data.OperationID + err.Error() return false, 223, errMsg, nil @@ -245,7 +264,15 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s return false, 202, "you are not in group", nil } } - isMute, err := userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID) + isMute, err := groupIsMuted(data.MsgData.GroupID) + if err != nil { + errMsg := data.OperationID + err.Error() + return false, 223, errMsg, nil + } + if isMute { + return false, 225, "group id muted", nil + } + isMute, err = userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID) if err != nil { errMsg := data.OperationID + err.Error() return false, 223, errMsg, nil diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go new file mode 100644 index 000000000..1edc5cc25 --- /dev/null +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -0,0 +1,49 @@ +package db + +type ExtendMsgSet struct { + ID string `bson:"id" json:"ID"` + ExtendMsg []*ExtendMsg `bson:"extend_msg" json:"extendMsg"` + LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + Ex string `bson:"ex" json:"ex"` + ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` + CreateTime int32 `bson:"create_time" json:"createTime"` +} + +type ExtendMsg struct { + SendID string `bson:"send_id" json:"sendID"` + ServerMsgID string `bson:"server_msg_id" json:"serverMsgID"` + Ex string `bson:"ex" json:"ex"` + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + LikeUserIDList []string `bson:"like_user_id_list" json:"likeUserIDList"` + Content string `bson:"content" json:"content"` + ExtendMsgComments []*ExtendMsgComment `bson:"extend_msg_comments" json:"extendMsgComment"` + Vote *Vote `bson:"vote" json:"vote"` + Urls []string `bson:"urls" json:"urls"` + CreateTime int32 `bson:"create_time" json:"createTime"` +} + +type Vote struct { + Content string `bson:"content" json:"content"` + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + Ex string `bson:"ex" json:"ex"` + Options []*Options `bson:"options" json:"options"` +} + +type Options struct { + Content string `bson:"content" json:"content"` + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + Ex string `bson:"ex" json:"ex"` + VoteUserIDList []string `bson:"vote_user_id_list" json:"voteUserIDList"` +} + +type ExtendMsgComment struct { + UserID string `bson:"user_id" json:"userID"` + ReplyUserID string `bson:"reply_user_id" json:"replyUserID"` + ReplyContentID string `bson:"reply_content_id" json:"replyContentID"` + ContentID string `bson:"content_id" json:"contentID"` + Content string `bson:"content" json:"content"` + CreateTime int32 `bson:"create_time" json:"createTime"` + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + Ex string `bson:"ex" json:"ex"` +} diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 88805b902..5bd96032e 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -318,7 +318,7 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error { return err } -func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) { +func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) @@ -334,13 +334,53 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) { } if len(userChats) > 0 { if len(userChats[0].Msg) > 0 { - return &userChats[0].Msg[len(userChats[0].Msg)-1], nil + msgPb := &open_im_sdk.MsgData{} + err = proto.Unmarshal(userChats[0].Msg[len(userChats[0].Msg)-1].Msg, msgPb) + if err != nil { + return nil, utils.Wrap(err, "") + } + return msgPb, nil } return nil, errors.New("len(userChats[0].Msg) < 0") } return nil, nil } +func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + regex := fmt.Sprintf("^%s", ID) + findOpts := options.Find().SetLimit(1).SetSort(bson.M{"uid": 1}) + var userChats []UserChat + cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts) + if err != nil { + return nil, err + } + err = cursor.All(ctx, &userChats) + if err != nil { + return nil, utils.Wrap(err, "") + } + var oldestMsg []byte + if len(userChats) > 0 { + for _, v := range userChats[0].Msg { + if v.SendTime != 0 { + oldestMsg = v.Msg + break + } + } + if len(oldestMsg) == 0 { + oldestMsg = userChats[0].Msg[len(userChats[0].Msg)-1].Msg + } + msgPb := &open_im_sdk.MsgData{} + err = proto.Unmarshal(oldestMsg, msgPb) + if err != nil { + return nil, utils.Wrap(err, "") + } + return msgPb, nil + } + return nil, nil +} + func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { var hasSeqList []uint32 singleCount := 0 diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index be9822b3e..cda798100 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -8,7 +8,6 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "errors" "fmt" "math/big" "sort" @@ -408,9 +407,6 @@ func GetJoinedSuperGroupListFromCache(userID string) ([]string, error) { if err != nil { return "", utils.Wrap(err, "") } - if len(userToSuperGroup.GroupIDList) == 0 { - return "", errors.New("GroupIDList == 0") - } bytes, err := json.Marshal(userToSuperGroup.GroupIDList) if err != nil { return "", utils.Wrap(err, "")