add timer update redis minSeq

This commit is contained in:
Gordon 2021-11-10 18:13:04 +08:00
parent a08aeee0c2
commit b729cac998
3 changed files with 85 additions and 31 deletions

View File

@ -1,42 +1,64 @@
package main package main
import ( import (
"Open_IM/pkg/common/config" commonDB "Open_IM/pkg/common/db"
"Open_IM/pkg/common/db" "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"fmt" "Open_IM/pkg/common/log"
"time" "time"
) )
func main() { func main() {
//for {
// fmt.Println("start delete mongodb expired record")
// timeUnixBegin := time.Now().Unix()
// count, _ := db.DB.MgoUserCount()
// fmt.Println("mongodb record count: ", count)
// for i := 0; i < count; i++ {
// time.Sleep(1 * time.Millisecond)
// uid, _ := db.DB.MgoSkipUID(i)
// fmt.Println("operate uid: ", uid)
// err := db.DB.DelUserChat(uid)
// if err != nil {
// fmt.Println("operate uid failed: ", uid, err.Error())
// }
// }
//
// timeUnixEnd := time.Now().Unix()
// costTime := timeUnixEnd - timeUnixBegin
// if costTime > int64(config.Config.Mongo.DBRetainChatRecords*24*3600) {
// continue
// } else {
// sleepTime := 0
// if int64(config.Config.Mongo.DBRetainChatRecords*24*3600)-costTime > 24*3600 {
// sleepTime = 24 * 3600
// } else {
// sleepTime = config.Config.Mongo.DBRetainChatRecords*24*3600 - int(costTime)
// }
// fmt.Println("sleep: ", sleepTime)
// time.Sleep(time.Duration(sleepTime) * time.Second)
// }
//}
for { for {
fmt.Println("start delete mongodb expired record") uidList, err := im_mysql_model.SelectAllUID()
timeUnixBegin := time.Now().Unix() if err != nil {
count, _ := db.DB.MgoUserCount() log.NewError("999999", err.Error())
fmt.Println("mongodb record count: ", count) } else {
for i := 0; i < count; i++ { for _, v := range uidList {
time.Sleep(1 * time.Millisecond) minSeq, err := commonDB.DB.GetMinSeqFromMongo(v)
uid, _ := db.DB.MgoSkipUID(i) if err != nil {
fmt.Println("operate uid: ", uid) log.NewError("999999", "get user minSeq err", err.Error(), v)
err := db.DB.DelUserChat(uid) continue
if err != nil { } else {
fmt.Println("operate uid failed: ", uid, err.Error()) err := commonDB.DB.SetUserMinSeq(v, minSeq)
if err != nil {
log.NewError("999999", "set user minSeq err", err.Error(), v)
}
}
time.Sleep(time.Duration(100) * time.Millisecond)
} }
} }
timeUnixEnd := time.Now().Unix()
costTime := timeUnixEnd - timeUnixBegin
if costTime > int64(config.Config.Mongo.DBRetainChatRecords*24*3600) {
continue
} else {
sleepTime := 0
if int64(config.Config.Mongo.DBRetainChatRecords*24*3600)-costTime > 24*3600 {
sleepTime = 24 * 3600
} else {
sleepTime = config.Config.Mongo.DBRetainChatRecords*24*3600 - int(costTime)
}
fmt.Println("sleep: ", sleepTime)
time.Sleep(time.Duration(sleepTime) * time.Second)
}
} }
} }

View File

@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat" pbMsg "Open_IM/pkg/proto/chat"
"errors" "errors"
"github.com/garyburd/redigo/redis"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
"strconv" "strconv"
@ -87,6 +88,34 @@ func (d *DataBases) GetMsgBySeqRange(uid string, seqBegin, seqEnd int64) (Single
return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil
} }
func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq int64, err error) {
var i int64
var seqUid string
session := d.mgoSession.Clone()
if session == nil {
return MinSeq, errors.New("session == nil")
}
defer session.Close()
c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
MaxSeq, err := d.GetUserMaxSeq(uid)
if err != nil && err != redis.ErrNil {
return MinSeq, err
}
NB := MaxSeq / singleGocMsgNum
for i = 0; i <= NB; i++ {
seqUid = indexGen(uid, i)
n, err := c.Find(bson.M{"uid": seqUid}).Count()
if err == nil && n != 0 {
if i == 0 {
MinSeq = 1
} else {
MinSeq = i * singleGocMsgNum
}
break
}
}
return MinSeq, nil
}
func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) {
allCount := 0 allCount := 0
singleCount := 0 singleCount := 0
@ -316,7 +345,7 @@ func getCurrentTimestampByMill() int64 {
} }
func getSeqUid(uid string, seq int64) string { func getSeqUid(uid string, seq int64) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocMsgNum
return uid + ":" + strconv.FormatInt(seqSuffix, 10) return indexGen(uid, seqSuffix)
} }
func isContainInt64(target int64, List []int64) bool { func isContainInt64(target int64, List []int64) bool {
@ -329,3 +358,6 @@ func isContainInt64(target int64, List []int64) bool {
return false return false
} }
func indexGen(uid string, seqSuffix int64) string {
return uid + ":" + strconv.FormatInt(seqSuffix, 10)
}

View File

@ -45,9 +45,9 @@ func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) {
} }
//Set the user's minimum seq //Set the user's minimum seq
func (d *DataBases) SetUserMinSeq(uid string) (err error) { func (d *DataBases) SetUserMinSeq(uid string, minSeq int64) (err error) {
key := userMinSeq + uid key := userMinSeq + uid
_, err = d.Exec("SET", key) _, err = d.Exec("SET", key, minSeq)
return err return err
} }