diff --git a/internal/api/third/minio_storage_credential.go b/internal/api/third/minio_storage_credential.go index 4c6fad1f2..07b256e0f 100644 --- a/internal/api/third/minio_storage_credential.go +++ b/internal/api/third/minio_storage_credential.go @@ -15,6 +15,8 @@ import ( _ "github.com/minio/minio-go/v7" cr "github.com/minio/minio-go/v7/pkg/credentials" "net/http" + "strconv" + "strings" ) // @Summary minio上传文件(web api) @@ -218,6 +220,13 @@ func UploadUpdateApp(c *gin.Context) { c.JSON(http.StatusOK, resp) } +func version2Int(version string) (int, error) { + versions := strings.Split(version, ".") + s := strings.Join(versions, "") + versionInt, err := strconv.Atoi(s) + return versionInt, err +} + func GetDownloadURL(c *gin.Context) { var ( req api.GetDownloadURLReq @@ -238,7 +247,13 @@ func GetDownloadURL(c *gin.Context) { } log.Debug(req.OperationID, utils.GetSelfFuncName(), "app: ", app) if app != nil { - if app.Version != req.Version && app.Version != "" { + appVersion, err := version2Int(app.Version) + reqVersion, err := version2Int(req.Version) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), "req version", req.Version, "app version", app.Version) + } + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "req version:", reqVersion, "app version:", appVersion) + if appVersion > reqVersion && app.Version != "" { resp.Data.HasNewVersion = true if app.ForceUpdate == true { resp.Data.ForceUpdate = true diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 41c1b18e0..3c43e3e20 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -2,11 +2,13 @@ package cronTask import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "github.com/golang/protobuf/proto" + "math" ) const oldestList = 0 @@ -19,6 +21,7 @@ func ResetUserGroupMinSeq(operationID, groupID string, userIDList []string) erro log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMongoMsg failed") return utils.Wrap(err, "") } + log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delMsgIDList, "minSeq", minSeq) for _, userID := range userIDList { userMinSeq, err := db.DB.GetGroupUserMinSeq(groupID, userID) if err != nil { @@ -43,7 +46,7 @@ func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { if err != nil { return utils.Wrap(err, "") } - log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", userID, delMsgIDList) + log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", delMsgIDList, "minSeq", minSeq) err = db.DB.SetUserMinSeq(userID, minSeq) return err } @@ -123,3 +126,29 @@ func getDelMaxSeqByIDList(delMsgIDList [][2]interface{}) uint32 { } return delMsgIDList[len(delMsgIDList)-1][1].(uint32) } + +func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error { + var maxSeq uint64 + var err error + if diffusionType == constant.WriteDiffusion { + maxSeq, err = db.DB.GetUserMaxSeq(ID) + } else { + maxSeq, err = db.DB.GetGroupMaxSeq(ID) + } + if err != nil { + return utils.Wrap(err, "GetUserMaxSeq failed") + } + msg, err := db.DB.GetNewestMsg(ID) + if err != nil { + return utils.Wrap(err, "GetNewestMsg failed") + } + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msg.Msg, msgPb) + if err != nil { + return utils.Wrap(err, "") + } + if math.Abs(float64(msgPb.Seq-uint32(maxSeq))) > 10 { + log.NewWarn(operationID, utils.GetSelfFuncName(), maxSeq, msgPb.Seq, "redis maxSeq is different with msg.Seq") + } + return nil +} diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 625ab539c..0b1784b20 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -28,6 +28,9 @@ func StartCronTask() { if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) } + if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), userID, err) + } } } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) @@ -45,7 +48,9 @@ func StartCronTask() { if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) } - + if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) + } } } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) @@ -56,6 +61,7 @@ func StartCronTask() { fmt.Println("start cron failed", err.Error()) panic(err) } + c.Start() fmt.Println("start cron task success") for { diff --git a/internal/cron_task/test/main.go b/internal/cron_task/test/main.go new file mode 100644 index 000000000..38dd16da6 --- /dev/null +++ b/internal/cron_task/test/main.go @@ -0,0 +1,3 @@ +package main + +func main() {} diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index a02a57ea8..69b4584f9 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -83,6 +83,7 @@ type Alert struct { type Android struct { Ups struct { Notification Notification `json:"notification"` + Options Options `json:"options"` } `json:"ups"` } @@ -92,6 +93,18 @@ type Notification struct { ClickType string `json:"click_type"` } +type Options struct { + HW struct { + DefaultSound bool `json:"/message/android/notification/default_sound"` + ChannelID string `json:"/message/android/notification/channel_id"` + Sound string `json:"/message/android/notification/sound"` + Importance string `json:"/message/android/notification/importance"` + } `json:"HW"` + XM struct { + ChannelID string `json:"/extra.channel_id"` + } `json:""` +} + type PushResp struct { } @@ -133,6 +146,17 @@ func (g *Getui) Push(userIDList []string, alert, detailContent, operationID stri Body: alert, ClickType: "startapp", } + pushReq.PushChannel.Android.Ups.Options = Options{ + HW: struct { + DefaultSound bool `json:"/message/android/notification/default_sound"` + ChannelID string `json:"/message/android/notification/channel_id"` + Sound string `json:"/message/android/notification/sound"` + Importance string `json:"/message/android/notification/importance"` + }{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "importance"}, + XM: struct { + ChannelID string `json:"/extra.channel_id"` + }{ChannelID: "Default"}, + } pushResp := PushResp{} err = g.request(PushURL, pushReq, token, &pushResp, operationID) switch err { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index fecad1d32..0eab7c022 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -227,6 +227,11 @@ const ( WorkMomentAtUserNotification = 2 ) +const ( + WriteDiffusion = 0 + ReadDiffusion = 1 +) + const ( AtAllString = "AtAllTag" AtNormal = 0 diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 083d89b2a..47b556b36 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -259,18 +259,22 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st return seqMsg, nil } -func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (msg *UserChat, err error) { +func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) regex := fmt.Sprintf("^%s", ID) - findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"$regex": regex}) - msg = &UserChat{} + findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"$regex": regex}).SetSort(bson.M{"uid": 1}) + var msgs []UserChat cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts) if err != nil { return nil, err } - err = cursor.Decode(&msg) - return msg, err + err = cursor.Decode(&msgs) + if len(msgs) > 0 { + return &msgs[0], err + } else { + return nil, errors.New("get msg list failed") + } } func (d *DataBases) DelMongoMsgs(IDList []string) error { @@ -298,6 +302,26 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error { return err } +func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + regex := fmt.Sprintf("^%s", ID) + findOpts := options.Find().SetLimit(1).SetSort(bson.M{"$regex": regex}).SetSort(bson.M{"uid": -1}) + var userChats []UserChat + cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts) + if err != nil { + return nil, err + } + err = cursor.Decode(&userChats) + if len(userChats) > 0 { + if len(userChats[0].Msg) > 0 { + return &userChats[0].Msg[len(userChats[0].Msg)], nil + } + return nil, errors.New("len(userChats[0].Msg) < 0") + } + return nil, errors.New("len(userChats) < 0") +} + func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { var hasSeqList []uint32 singleCount := 0