This commit is contained in:
wangchuxiao 2023-01-16 20:14:26 +08:00
parent 80b22e0e0e
commit 822db2a63e
35 changed files with 590 additions and 686 deletions

View File

@ -4,6 +4,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/log"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
@ -89,7 +90,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
msgs, err := db.DB.GetUserMsgListByIndex(ID, index)
if err != nil || msgs.UID == "" {
if err != nil {
if err == db.ErrMsgListNotExist {
if err == mongo.ErrMsgListNotExist {
log.NewInfo(operationID, utils.GetSelfFuncName(), "ID:", ID, "index:", index, err.Error())
} else {
log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID)
@ -103,7 +104,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
return delStruct.getSetMinSeq() + 1, nil
}
log.NewDebug(operationID, "ID:", ID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
if len(msgs.Msg) > db.GetSingleGocMsgNum() {
if len(msgs.Msg) > mongo.GetSingleGocMsgNum() {
log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID)
}
if msgs.Msg[len(msgs.Msg)-1].SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) > utils.GetCurrentTimestampByMill() && msgListIsFull(msgs) {
@ -149,7 +150,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
return seq, utils.Wrap(err, "deleteMongoMsg failed")
}
func msgListIsFull(chat *db.UserChat) bool {
func msgListIsFull(chat *mongo.UserChat) bool {
index, _ := strconv.Atoi(strings.Split(chat.UID, ":")[1])
if index == 0 {
if len(chat.Msg) >= 4999 {

View File

@ -2,7 +2,7 @@ package cronTask
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
mongo2 "Open_IM/pkg/common/db/mongo"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"context"
"fmt"
@ -22,8 +22,8 @@ var (
mongoClient *mongo.Collection
)
func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.UserChat {
chat := &db.UserChat{UID: userID + strconv.Itoa(int(index))}
func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *mongo2.UserChat {
chat := &mongo2.UserChat{UID: userID + strconv.Itoa(int(index))}
for i := startSeq; i <= stopSeq; i++ {
msg := server_api_params.MsgData{
SendID: "sendID1",
@ -45,7 +45,7 @@ func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *db.Use
}
bytes, _ := proto.Marshal(&msg)
sendTime := 0
chat.Msg = append(chat.Msg, db.MsgInfo{SendTime: int64(sendTime), Msg: bytes})
chat.Msg = append(chat.Msg, mongo2.MsgInfo{SendTime: int64(sendTime), Msg: bytes})
}
return chat
}
@ -54,7 +54,7 @@ func SetUserMaxSeq(userID string, seq int) error {
return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err()
}
func CreateChat(userChat *db.UserChat) error {
func CreateChat(userChat *mongo2.UserChat) error {
_, err := mongoClient.InsertOne(context.Background(), userChat)
return err
}

View File

@ -5,6 +5,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/mongo"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
@ -70,14 +71,14 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
}
if !notification.IsReact {
// first time to modify
var reactionExtensionList = make(map[string]db.KeyValue)
extendMsg := db.ExtendMsg{
var reactionExtensionList = make(map[string]mongo.KeyValue)
extendMsg := mongo.ExtendMsg{
ReactionExtensionList: reactionExtensionList,
ClientMsgID: notification.ClientMsgID,
MsgFirstModifyTime: notification.MsgFirstModifyTime,
}
for _, v := range notification.SuccessReactionExtensionList {
reactionExtensionList[v.TypeKey] = db.KeyValue{
reactionExtensionList[v.TypeKey] = mongo.KeyValue{
TypeKey: v.TypeKey,
Value: v.Value,
LatestUpdateTime: v.LatestUpdateTime,

View File

@ -5,6 +5,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
@ -296,7 +297,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR
}
}
var tagSendLogs db.TagSendLog
var tagSendLogs mongo.TagSendLog
var wg sync.WaitGroup
wg.Add(len(successUserIDList))
var lock sync.Mutex
@ -309,7 +310,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR
return
}
lock.Lock()
tagSendLogs.UserList = append(tagSendLogs.UserList, db.TagUser{
tagSendLogs.UserList = append(tagSendLogs.UserList, mongo.TagUser{
UserID: userID,
UserName: userName,
})
@ -388,10 +389,10 @@ func (s *officeServer) GetUserTagByID(_ context.Context, req *pbOffice.GetUserTa
func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.CreateOneWorkMomentReq) (resp *pbOffice.CreateOneWorkMomentResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbOffice.CreateOneWorkMomentResp{CommonResp: &pbOffice.CommonResp{}}
workMoment := db.WorkMoment{
Comments: []*db.Comment{},
LikeUserList: []*db.WorkMomentUser{},
PermissionUserList: []*db.WorkMomentUser{},
workMoment := mongo.WorkMoment{
Comments: []*mongo.Comment{},
LikeUserList: []*mongo.WorkMomentUser{},
PermissionUserList: []*mongo.WorkMomentUser{},
}
createUser, err := imdb.GetUserByUserID(req.WorkMoment.UserID)
if err != nil {
@ -405,14 +406,14 @@ func (s *officeServer) CreateOneWorkMoment(_ context.Context, req *pbOffice.Crea
workMoment.UserName = createUser.Nickname
workMoment.FaceURL = createUser.FaceURL
workMoment.PermissionUserIDList = s.getPermissionUserIDList(req.OperationID, req.WorkMoment.PermissionGroupList, req.WorkMoment.PermissionUserList)
workMoment.PermissionUserList = []*db.WorkMomentUser{}
workMoment.PermissionUserList = []*mongo.WorkMomentUser{}
for _, userID := range workMoment.PermissionUserIDList {
userName, err := imdb.GetUserNameByUserID(userID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error())
continue
}
workMoment.PermissionUserList = append(workMoment.PermissionUserList, &db.WorkMomentUser{
workMoment.PermissionUserList = append(workMoment.PermissionUserList, &mongo.WorkMomentUser{
UserID: userID,
UserName: userName,
})
@ -502,7 +503,7 @@ func (s *officeServer) DeleteOneWorkMoment(_ context.Context, req *pbOffice.Dele
return resp, nil
}
func isUserCanSeeWorkMoment(userID string, workMoment db.WorkMoment) bool {
func isUserCanSeeWorkMoment(userID string, workMoment mongo.WorkMoment) bool {
if userID != workMoment.UserID {
switch workMoment.Permission {
case constant.WorkMomentPublic:
@ -569,7 +570,7 @@ func (s *officeServer) CommentOneWorkMoment(_ context.Context, req *pbOffice.Com
return resp, nil
}
}
comment := &db.Comment{
comment := &mongo.Comment{
UserID: req.UserID,
UserName: commentUser.Nickname,
ReplyUserID: req.ReplyUserID,
@ -643,7 +644,7 @@ func (s *officeServer) GetUserWorkMoments(_ context.Context, req *pbOffice.GetUs
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbOffice.GetUserWorkMomentsResp{CommonResp: &pbOffice.CommonResp{}, WorkMoments: []*pbOffice.WorkMoment{}}
resp.Pagination = &pbCommon.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber}
var workMoments []db.WorkMoment
var workMoments []mongo.WorkMoment
if req.UserID == req.OpUserID {
workMoments, err = db.DB.GetUserSelfWorkMoments(req.UserID, req.Pagination.ShowNumber, req.Pagination.PageNumber)
} else {

View File

@ -1,4 +1,4 @@
package db
package cache
import (
"Open_IM/pkg/common/config"
@ -42,135 +42,175 @@ const (
exTypeKeyLocker = "EX_LOCK:"
)
func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) {
func InitRedis(ctx context.Context) go_redis.UniversalClient {
var rdb go_redis.UniversalClient
var err error
if config.Config.Redis.EnableCluster {
rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
PoolSize: 50,
})
_, err = rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress)
panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
}
} else {
rdb = go_redis.NewClient(&go_redis.Options{
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
_, err = rdb.Ping(ctx).Result()
if err != nil {
panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
}
}
return rdb
}
func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb}
}
type RedisClient struct {
rdb go_redis.UniversalClient
}
func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) {
key := accountTempCode + account
n, err := d.RDB.Exists(context.Background(), key).Result()
n, err := r.rdb.Exists(context.Background(), key).Result()
if n > 0 {
return true, err
} else {
return false, err
}
}
func (d *DataBases) SetAccountCode(account string, code, ttl int) (err error) {
func (r *RedisClient) SetAccountCode(account string, code, ttl int) (err error) {
key := accountTempCode + account
return d.RDB.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err()
return r.rdb.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err()
}
func (d *DataBases) GetAccountCode(account string) (string, error) {
func (r *RedisClient) GetAccountCode(account string) (string, error) {
key := accountTempCode + account
return d.RDB.Get(context.Background(), key).Result()
return r.rdb.Get(context.Background(), key).Result()
}
//Perform seq auto-increment operation of user messages
func (d *DataBases) IncrUserSeq(uid string) (uint64, error) {
func (r *RedisClient) IncrUserSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
seq, err := d.RDB.Incr(context.Background(), key).Result()
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
}
//Get the largest Seq
func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) {
func (r *RedisClient) GetUserMaxSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
seq, err := d.RDB.Get(context.Background(), key).Result()
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
//set the largest Seq
func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error {
func (r *RedisClient) SetUserMaxSeq(uid string, maxSeq uint64) error {
key := userIncrSeq + uid
return d.RDB.Set(context.Background(), key, maxSeq, 0).Err()
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
//Set the user's minimum seq
func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) {
func (r *RedisClient) SetUserMinSeq(uid string, minSeq uint32) (err error) {
key := userMinSeq + uid
return d.RDB.Set(context.Background(), key, minSeq, 0).Err()
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
//Get the smallest Seq
func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) {
func (r *RedisClient) GetUserMinSeq(uid string) (uint64, error) {
key := userMinSeq + uid
seq, err := d.RDB.Get(context.Background(), key).Result()
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
func (d *DataBases) SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error) {
func (r *RedisClient) SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return d.RDB.Set(context.Background(), key, minSeq, 0).Err()
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
func (d *DataBases) GetGroupUserMinSeq(groupID, userID string) (uint64, error) {
func (r *RedisClient) GetGroupUserMinSeq(groupID, userID string) (uint64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := d.RDB.Get(context.Background(), key).Result()
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
func (d *DataBases) GetGroupMaxSeq(groupID string) (uint64, error) {
func (r *RedisClient) GetGroupMaxSeq(groupID string) (uint64, error) {
key := groupMaxSeq + groupID
seq, err := d.RDB.Get(context.Background(), key).Result()
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
func (d *DataBases) IncrGroupMaxSeq(groupID string) (uint64, error) {
func (r *RedisClient) IncrGroupMaxSeq(groupID string) (uint64, error) {
key := groupMaxSeq + groupID
seq, err := d.RDB.Incr(context.Background(), key).Result()
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
}
func (d *DataBases) SetGroupMaxSeq(groupID string, maxSeq uint64) error {
func (r *RedisClient) SetGroupMaxSeq(groupID string, maxSeq uint64) error {
key := groupMaxSeq + groupID
return d.RDB.Set(context.Background(), key, maxSeq, 0).Err()
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
func (d *DataBases) SetGroupMinSeq(groupID string, minSeq uint32) error {
func (r *RedisClient) SetGroupMinSeq(groupID string, minSeq uint32) error {
key := groupMinSeq + groupID
return d.RDB.Set(context.Background(), key, minSeq, 0).Err()
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
//Store userid and platform class to redis
func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error {
func (r *RedisClient) AddTokenFlag(userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
log2.NewDebug("", "add token key is ", key)
return d.RDB.HSet(context.Background(), key, token, flag).Err()
return r.rdb.HSet(context.Background(), key, token, flag).Err()
}
func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) {
func (r *RedisClient) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
log2.NewDebug("", "get token key is ", key)
m, err := d.RDB.HGetAll(context.Background(), key).Result()
m, err := r.rdb.HGetAll(context.Background(), key).Result()
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, err
}
func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error {
func (r *RedisClient) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
return d.RDB.HSet(context.Background(), key, mm).Err()
return r.rdb.HSet(context.Background(), key, mm).Err()
}
func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error {
func (r *RedisClient) DeleteTokenByUidPid(userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return d.RDB.HDel(context.Background(), key, fields...).Err()
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
func (d *DataBases) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error {
func (r *RedisClient) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error {
key := conversationReceiveMessageOpt + userID
return d.RDB.HSet(context.Background(), key, conversationID, opt).Err()
return r.rdb.HSet(context.Background(), key, conversationID, opt).Err()
}
func (d *DataBases) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) {
func (r *RedisClient) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) {
key := conversationReceiveMessageOpt + userID
result, err := d.RDB.HGet(context.Background(), key, conversationID).Result()
result, err := r.rdb.HGet(context.Background(), key, conversationID).Result()
return utils.StringToInt(result), err
}
func (d *DataBases) SetUserGlobalMsgRecvOpt(userID string, opt int32) error {
func (r *RedisClient) SetUserGlobalMsgRecvOpt(userID string, opt int32) error {
key := conversationReceiveMessageOpt + userID
return d.RDB.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err()
return r.rdb.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err()
}
func (d *DataBases) GetUserGlobalMsgRecvOpt(userID string) (int, error) {
func (r *RedisClient) GetUserGlobalMsgRecvOpt(userID string) (int, error) {
key := conversationReceiveMessageOpt + userID
result, err := d.RDB.HGet(context.Background(), key, GlobalMsgRecvOpt).Result()
result, err := r.rdb.HGet(context.Background(), key, GlobalMsgRecvOpt).Result()
if err != nil {
if err == go_redis.Nil {
return 0, nil
@ -180,11 +220,11 @@ func (d *DataBases) GetUserGlobalMsgRecvOpt(userID string) (int, error) {
}
return utils.StringToInt(result), err
}
func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) {
func (r *RedisClient) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := d.RDB.Get(context.Background(), key).Result()
result, err := r.rdb.Get(context.Background(), key).Result()
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
@ -206,9 +246,9 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati
return seqMsg, failedSeqList, errResult
}
func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) {
func (r *RedisClient) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) {
ctx := context.Background()
pipe := d.RDB.Pipeline()
pipe := r.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
@ -219,7 +259,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
}
log2.NewDebug(operationID, "convert string is ", s)
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
//err = d.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
//err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error())
failedList = append(failedList, *msg)
@ -231,11 +271,11 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
_, err := pipe.Exec(ctx)
return err, 0
}
func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
func (r *RedisClient) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
ctx := context.Background()
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
err := d.RDB.Del(ctx, key).Err()
err := r.rdb.Del(ctx, key).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, uid, err.Error(), msgList)
}
@ -243,10 +283,10 @@ func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid st
return nil
}
func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error {
func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error {
ctx := context.Background()
key := messageCache + userID + "_" + "*"
vals, err := d.RDB.Keys(ctx, key).Result()
vals, err := r.rdb.Keys(ctx, key).Result()
log2.Debug(operationID, "vals: ", vals)
if err == go_redis.Nil {
return nil
@ -255,12 +295,12 @@ func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID str
return utils.Wrap(err, "")
}
for _, v := range vals {
err = d.RDB.Del(ctx, v).Err()
err = r.rdb.Del(ctx, v).Err()
}
return nil
}
func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) {
func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, err
@ -293,16 +333,16 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData,
return false, err
}
keyList := SignalListCache + userID
err = d.RDB.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
if err != nil {
return false, err
}
err = d.RDB.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
err = r.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
}
key := SignalCache + msg.ClientMsgID
err = d.RDB.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
}
@ -311,10 +351,10 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData,
return true, nil
}
func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key := SignalCache + clientMsgID
invitationInfo = &pbRtc.SignalInviteReq{}
bytes, err := d.RDB.Get(context.Background(), key).Bytes()
bytes, err := r.rdb.Get(context.Background(), key).Bytes()
if err != nil {
return nil, err
}
@ -333,9 +373,9 @@ func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (inv
return invitationInfo, err
}
func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
func (r *RedisClient) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := SignalListCache + userID
result := d.RDB.LPop(context.Background(), keyList)
result := r.rdb.LPop(context.Background(), keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
@ -344,27 +384,27 @@ func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationI
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
log2.NewDebug("", utils.GetSelfFuncName(), result, result.String())
invitationInfo, err = d.GetSignalInfoFromCacheByClientMsgID(key)
invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(key)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
err = d.DelUserSignalList(userID)
err = r.DelUserSignalList(userID)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
return invitationInfo, nil
}
func (d *DataBases) DelUserSignalList(userID string) error {
func (r *RedisClient) DelUserSignalList(userID string) error {
keyList := SignalListCache + userID
err := d.RDB.Del(context.Background(), keyList).Err()
err := r.rdb.Del(context.Background(), keyList).Err()
return err
}
func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) {
func (r *RedisClient) DelMsgFromCache(uid string, seqList []uint32, operationID string) {
for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := d.RDB.Get(context.Background(), key).Result()
result, err := r.rdb.Get(context.Background(), key).Result()
if err != nil {
if err == go_redis.Nil {
log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil")
@ -384,36 +424,36 @@ func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID st
log2.Error(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg, err.Error())
continue
}
if err := d.RDB.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
if err := r.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
log2.Error(operationID, utils.GetSelfFuncName(), "Set failed", err.Error())
}
}
}
func (d *DataBases) SetGetuiToken(token string, expireTime int64) error {
return d.RDB.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err()
func (r *RedisClient) SetGetuiToken(token string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err()
}
func (d *DataBases) GetGetuiToken() (string, error) {
result, err := d.RDB.Get(context.Background(), getuiToken).Result()
func (r *RedisClient) GetGetuiToken() (string, error) {
result, err := r.rdb.Get(context.Background(), getuiToken).Result()
return result, err
}
func (d *DataBases) SetGetuiTaskID(taskID string, expireTime int64) error {
return d.RDB.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()
func (r *RedisClient) SetGetuiTaskID(taskID string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()
}
func (d *DataBases) GetGetuiTaskID() (string, error) {
result, err := d.RDB.Get(context.Background(), getuiTaskID).Result()
func (r *RedisClient) GetGetuiTaskID() (string, error) {
result, err := r.rdb.Get(context.Background(), getuiTaskID).Result()
return result, err
}
func (d *DataBases) SetSendMsgStatus(status int32, operationID string) error {
return d.RDB.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err()
func (r *RedisClient) SetSendMsgStatus(status int32, operationID string) error {
return r.rdb.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err()
}
func (d *DataBases) GetSendMsgStatus(operationID string) (int, error) {
result, err := d.RDB.Get(context.Background(), sendMsgFailedFlag+operationID).Result()
func (r *RedisClient) GetSendMsgStatus(operationID string) (int, error) {
result, err := r.rdb.Get(context.Background(), sendMsgFailedFlag+operationID).Result()
if err != nil {
return 0, err
}
@ -421,36 +461,36 @@ func (d *DataBases) GetSendMsgStatus(operationID string) (int, error) {
return status, err
}
func (d *DataBases) SetFcmToken(account string, platformID int, fcmToken string, expireTime int64) (err error) {
func (r *RedisClient) SetFcmToken(account string, platformID int, fcmToken string, expireTime int64) (err error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return d.RDB.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err()
return r.rdb.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err()
}
func (d *DataBases) GetFcmToken(account string, platformID int) (string, error) {
func (r *RedisClient) GetFcmToken(account string, platformID int) (string, error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return d.RDB.Get(context.Background(), key).Result()
return r.rdb.Get(context.Background(), key).Result()
}
func (d *DataBases) DelFcmToken(account string, platformID int) error {
func (r *RedisClient) DelFcmToken(account string, platformID int) error {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return d.RDB.Del(context.Background(), key).Err()
return r.rdb.Del(context.Background(), key).Err()
}
func (d *DataBases) IncrUserBadgeUnreadCountSum(uid string) (int, error) {
func (r *RedisClient) IncrUserBadgeUnreadCountSum(uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := d.RDB.Incr(context.Background(), key).Result()
seq, err := r.rdb.Incr(context.Background(), key).Result()
return int(seq), err
}
func (d *DataBases) SetUserBadgeUnreadCountSum(uid string, value int) error {
func (r *RedisClient) SetUserBadgeUnreadCountSum(uid string, value int) error {
key := userBadgeUnreadCountSum + uid
return d.RDB.Set(context.Background(), key, value, 0).Err()
return r.rdb.Set(context.Background(), key, value, 0).Err()
}
func (d *DataBases) GetUserBadgeUnreadCountSum(uid string) (int, error) {
func (r *RedisClient) GetUserBadgeUnreadCountSum(uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := d.RDB.Get(context.Background(), key).Result()
seq, err := r.rdb.Get(context.Background(), key).Result()
return utils.StringToInt(seq), err
}
func (d *DataBases) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) {
func (r *RedisClient) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
n, err := d.RDB.Exists(context.Background(), key).Result()
n, err := r.rdb.Exists(context.Background(), key).Result()
if n > 0 {
return true, err
} else {
@ -458,38 +498,38 @@ func (d *DataBases) JudgeMessageReactionEXISTS(clientMsgID string, sessionType i
}
}
func (d *DataBases) GetOneMessageAllReactionList(clientMsgID string, sessionType int32) (map[string]string, error) {
func (r *RedisClient) GetOneMessageAllReactionList(clientMsgID string, sessionType int32) (map[string]string, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return d.RDB.HGetAll(context.Background(), key).Result()
return r.rdb.HGetAll(context.Background(), key).Result()
}
func (d *DataBases) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error {
func (r *RedisClient) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return d.RDB.HDel(context.Background(), key, subKey).Err()
return r.rdb.HDel(context.Background(), key, subKey).Err()
}
func (d *DataBases) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
func (r *RedisClient) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return d.RDB.Expire(context.Background(), key, expiration).Result()
return r.rdb.Expire(context.Background(), key, expiration).Result()
}
func (d *DataBases) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) {
func (r *RedisClient) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
result, err := d.RDB.HGet(context.Background(), key, typeKey).Result()
result, err := r.rdb.HGet(context.Background(), key, typeKey).Result()
return result, err
}
func (d *DataBases) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error {
func (r *RedisClient) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return d.RDB.HSet(context.Background(), key, typeKey, value).Err()
return r.rdb.HSet(context.Background(), key, typeKey, value).Err()
}
func (d *DataBases) LockMessageTypeKey(clientMsgID string, TypeKey string) error {
func (r *RedisClient) LockMessageTypeKey(clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return d.RDB.SetNX(context.Background(), key, 1, time.Minute).Err()
return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err()
}
func (d *DataBases) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error {
func (r *RedisClient) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return d.RDB.Del(context.Background(), key).Err()
return r.rdb.Del(context.Background(), key).Err()
}

View File

@ -1,4 +1,4 @@
package db
package cache
import (
"Open_IM/pkg/common/constant"

View File

@ -1,15 +1,16 @@
package rocksCache
package cache
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/db/mysql"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/trace_log"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"fmt"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"math/big"
"sort"
"strconv"
@ -36,8 +37,19 @@ const (
extendMsgCache = "EXTEND_MSG_CACHE:"
)
func DelKeys() {
fmt.Println("cache init to del old keys")
const scanCount = 3000
type RcClient struct {
rdb redis.UniversalClient
Cache *rockscache.Client
ExpireTime time.Duration
}
func NewRcClient(rdb redis.UniversalClient, expireTime time.Duration, opts rockscache.Options) *RcClient {
return &RcClient{Cache: rockscache.NewClient(rdb, opts), ExpireTime: expireTime}
}
func (rc *RcClient) DelKeys() {
for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache,
groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} {
fName := utils.GetSelfFuncName()
@ -46,16 +58,16 @@ func DelKeys() {
for {
var keys []string
var err error
keys, cursor, err = db.DB.RDB.Scan(context.Background(), cursor, key+"*", 3000).Result()
keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
if err != nil {
panic(err.Error())
}
n += len(keys)
// for each for redis cluster
for _, key := range keys {
if err = db.DB.RDB.Del(context.Background(), key).Err(); err != nil {
if err = rc.rdb.Del(context.Background(), key).Err(); err != nil {
log.NewError("", fName, key, err.Error())
err = db.DB.RDB.Del(context.Background(), key).Err()
err = rc.rdb.Del(context.Background(), key).Err()
if err != nil {
panic(err.Error())
}
@ -68,9 +80,9 @@ func DelKeys() {
}
}
func GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) {
func (rc *RcClient) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) {
getFriendIDList := func() (string, error) {
friendIDList, err := imdb.GetFriendIDListByUserID(userID)
friendIDList, err := mysql.GetFriendIDListByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -100,7 +112,7 @@ func DelFriendIDListFromCache(ctx context.Context, userID string) (err error) {
func GetBlackListFromCache(ctx context.Context, userID string) (blackIDs []string, err error) {
getBlackIDList := func() (string, error) {
blackIDs, err := imdb.GetBlackIDListByUserID(userID)
blackIDs, err := mysql.GetBlackIDListByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -130,7 +142,7 @@ func DelBlackIDListFromCache(ctx context.Context, userID string) (err error) {
func GetJoinedGroupIDListFromCache(ctx context.Context, userID string) (joinedGroupList []string, err error) {
getJoinedGroupIDList := func() (string, error) {
joinedGroupList, err := imdb.GetJoinedGroupIDListByUserID(userID)
joinedGroupList, err := mysql.GetJoinedGroupIDListByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -172,7 +184,7 @@ func GetGroupMemberIDListFromCache(ctx context.Context, groupID string) (groupMe
}
groupMemberIDList = superGroup.MemberIDList
} else {
groupMemberIDList, err = imdb.GetGroupMemberIDListByGroupID(groupID)
groupMemberIDList, err = mysql.GetGroupMemberIDListByGroupID(groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -201,9 +213,9 @@ func DelGroupMemberIDListFromCache(ctx context.Context, groupID string) (err err
return db.DB.Rc.TagAsDeleted(groupCache + groupID)
}
func GetUserInfoFromCache(ctx context.Context, userID string) (userInfo *imdb.User, err error) {
func GetUserInfoFromCache(ctx context.Context, userID string) (userInfo *mysql.User, err error) {
getUserInfo := func() (string, error) {
userInfo, err := imdb.GetUserByUserID(userID)
userInfo, err := mysql.GetUserByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -220,13 +232,13 @@ func GetUserInfoFromCache(ctx context.Context, userID string) (userInfo *imdb.Us
if err != nil {
return nil, utils.Wrap(err, "")
}
userInfo = &imdb.User{}
userInfo = &mysql.User{}
err = json.Unmarshal([]byte(userInfoStr), userInfo)
return userInfo, utils.Wrap(err, "")
}
func GetUserInfoFromCacheBatch(ctx context.Context, userIDs []string) ([]*imdb.User, error) {
var users []*imdb.User
func GetUserInfoFromCacheBatch(ctx context.Context, userIDs []string) ([]*mysql.User, error) {
var users []*mysql.User
for _, userID := range userIDs {
user, err := GetUserInfoFromCache(ctx, userID)
if err != nil {
@ -244,9 +256,9 @@ func DelUserInfoFromCache(ctx context.Context, userID string) (err error) {
return db.DB.Rc.TagAsDeleted(userInfoCache + userID)
}
func GetGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (groupMember *imdb.GroupMember, err error) {
func GetGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (groupMember *mysql.GroupMember, err error) {
getGroupMemberInfo := func() (string, error) {
groupMemberInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
groupMemberInfo, err := mysql.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -263,7 +275,7 @@ func GetGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (g
if err != nil {
return nil, utils.Wrap(err, "")
}
groupMember = &imdb.GroupMember{}
groupMember = &mysql.GroupMember{}
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
return groupMember, utils.Wrap(err, "")
}
@ -275,7 +287,7 @@ func DelGroupMemberInfoFromCache(ctx context.Context, groupID, userID string) (e
return db.DB.Rc.TagAsDeleted(groupMemberInfoCache + groupID + "-" + userID)
}
func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, groupID string) (groupMembers []*imdb.GroupMember, err error) {
func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, groupID string) (groupMembers []*mysql.GroupMember, err error) {
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
}()
@ -286,7 +298,7 @@ func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, grou
if count < 0 || offset < 0 {
return nil, nil
}
var groupMemberList []*imdb.GroupMember
var groupMemberList []*mysql.GroupMember
var start, stop int32
start = offset
stop = offset + count
@ -322,12 +334,12 @@ func GetGroupMembersInfoFromCache(ctx context.Context, count, offset int32, grou
return groupMemberList, nil
}
func GetAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (groupMembers []*imdb.GroupMember, err error) {
func GetAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (groupMembers []*mysql.GroupMember, err error) {
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMembers", groupMembers)
}()
getGroupMemberInfo := func() (string, error) {
groupMembers, err := imdb.GetGroupMemberListByGroupID(groupID)
groupMembers, err := mysql.GetGroupMemberListByGroupID(groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -352,9 +364,9 @@ func DelAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (err e
return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID)
}
func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *imdb.Group, err error) {
func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
getGroupInfo := func() (string, error) {
groupInfo, err := imdb.GetGroupInfoByGroupID(groupID)
groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -364,7 +376,7 @@ func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *imdb
}
return string(bytes), nil
}
groupInfo = &imdb.Group{}
groupInfo = &mysql.Group{}
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
}()
@ -383,9 +395,9 @@ func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) {
return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID)
}
func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*imdb.Friend, err error) {
func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) {
getAllFriendInfo := func() (string, error) {
friendInfoList, err := imdb.GetFriendListByUserID(userID)
friendInfoList, err := mysql.GetFriendListByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -485,7 +497,7 @@ func DelGroupMemberListHashFromCache(ctx context.Context, groupID string) (err e
func GetGroupMemberNumFromCache(ctx context.Context, groupID string) (num int, err error) {
getGroupMemberNum := func() (string, error) {
num, err := imdb.GetGroupMemberNumByGroupID(groupID)
num, err := mysql.GetGroupMemberNumByGroupID(groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
@ -510,7 +522,7 @@ func DelGroupMemberNumFromCache(ctx context.Context, groupID string) (err error)
func GetUserConversationIDListFromCache(ctx context.Context, userID string) (conversationIDs []string, err error) {
getConversationIDList := func() (string, error) {
conversationIDList, err := imdb.GetConversationIDListByUserID(userID)
conversationIDList, err := mysql.GetConversationIDListByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "getConversationIDList failed")
}
@ -539,9 +551,9 @@ func DelUserConversationIDListFromCache(ctx context.Context, userID string) (err
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationIDListCache+userID), "DelUserConversationIDListFromCache err")
}
func GetConversationFromCache(ctx context.Context, ownerUserID, conversationID string) (conversation *imdb.Conversation, err error) {
func GetConversationFromCache(ctx context.Context, ownerUserID, conversationID string) (conversation *mysql.Conversation, err error) {
getConversation := func() (string, error) {
conversation, err := imdb.GetConversation(ownerUserID, conversationID)
conversation, err := mysql.GetConversation(ownerUserID, conversationID)
if err != nil {
return "", utils.Wrap(err, "get failed")
}
@ -558,12 +570,12 @@ func GetConversationFromCache(ctx context.Context, ownerUserID, conversationID s
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
conversation = &imdb.Conversation{}
conversation = &mysql.Conversation{}
err = json.Unmarshal([]byte(conversationStr), &conversation)
return conversation, utils.Wrap(err, "Unmarshal failed")
}
func GetConversationsFromCache(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []imdb.Conversation, err error) {
func GetConversationsFromCache(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []mysql.Conversation, err error) {
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations)
}()
@ -577,7 +589,7 @@ func GetConversationsFromCache(ctx context.Context, ownerUserID string, conversa
return conversations, nil
}
func GetUserAllConversationList(ctx context.Context, ownerUserID string) (conversations []imdb.Conversation, err error) {
func GetUserAllConversationList(ctx context.Context, ownerUserID string) (conversations []mysql.Conversation, err error) {
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations)
}()
@ -585,7 +597,7 @@ func GetUserAllConversationList(ctx context.Context, ownerUserID string) (conver
if err != nil {
return nil, err
}
var conversationList []imdb.Conversation
var conversationList []mysql.Conversation
log.NewDebug("", utils.GetSelfFuncName(), IDList)
for _, conversationID := range IDList {
conversation, err := GetConversationFromCache(ctx, ownerUserID, conversationID)
@ -604,7 +616,7 @@ func DelConversationFromCache(ctx context.Context, ownerUserID, conversationID s
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
}
func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *db.ExtendMsg, err error) {
func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *mongo.ExtendMsg, err error) {
getExtendMsg := func() (string, error) {
extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
if err != nil {
@ -624,7 +636,7 @@ func GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clien
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
extendMsg = &db.ExtendMsg{}
extendMsg = &mongo.ExtendMsg{}
err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
return extendMsg, utils.Wrap(err, "Unmarshal failed")
}

View File

@ -0,0 +1,111 @@
package model
import (
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/mysql"
"Open_IM/pkg/common/trace_log"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"gorm.io/gorm"
"time"
)
type GroupModel struct {
strongRc *cache.RcClient
weakRc *cache.RcClient
db *mysql.Group
rdb *cache.RedisClient
}
const GroupExpireTime = time.Second * 300 * 60
const RandomExpireAdjustment = 0.2
//cache key
const groupInfoCache = "GROUP_INFO_CACHE:"
func NewGroupModel(ctx context.Context) {
var groupModel GroupModel
redisClient := cache.InitRedis(ctx)
rdb := cache.NewRedisClient(redisClient)
groupModel.rdb = rdb
groupModel.db = mysql.NewGroupDB()
groupModel.strongRc = cache.NewRcClient(redisClient, GroupExpireTime, rockscache.Options{
RandomExpireAdjustment: RandomExpireAdjustment,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: true,
})
groupModel.weakRc = cache.NewRcClient(redisClient, GroupExpireTime, rockscache.Options{
RandomExpireAdjustment: RandomExpireAdjustment,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: false,
})
}
func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) {
for _, groupID := range groupIDs {
group, err := g.getGroupInfoFromCache(ctx, groupID)
if err != nil {
return nil, err
}
groups = append(groups, group)
}
return groups, nil
}
func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error {
return g.db.Create(ctx, groups)
}
func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error {
tx := g.db.DB.Begin()
if err := g.db.Delete(ctx, groupIDs); err != nil {
tx.Commit()
return err
}
if err := g.deleteGroupsInCache(ctx, groupIDs); err != nil {
tx.Rollback()
return err
}
return nil
}
func (g *GroupModel) getGroupCacheKey(groupID string) string {
return groupInfoCache + groupID
}
func (g *GroupModel) deleteGroupsInCache(ctx context.Context, groupIDs []string) error {
for _, groupID := range groupIDs {
if err := g.weakRc.Cache.TagAsDeleted(g.getGroupCacheKey(groupID)); err != nil {
return err
}
}
return nil
}
func (g *GroupModel) getGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
getGroupInfo := func() (string, error) {
groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
bytes, err := json.Marshal(groupInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
groupInfo = &mysql.Group{}
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
}()
groupInfoStr, err := g.weakRc.Cache.Fetch(groupInfoCache+groupID, GroupExpireTime, getGroupInfo)
if err != nil {
return nil, utils.Wrap(err, "")
}
err = json.Unmarshal([]byte(groupInfoStr), groupInfo)
return groupInfo, utils.Wrap(err, "")
}

View File

@ -1,8 +1,9 @@
package db
package mongo
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
pbMsg "Open_IM/pkg/proto/msg"
@ -15,7 +16,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
)
func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error {
func (d *db.DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error {
newTime := getCurrentTimestampByMill()
if len(msgList) > GetSingleGocMsgNum() {
return errors.New("too large")
@ -24,17 +25,17 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
var remain uint64
blk0 := uint64(GetSingleGocMsgNum() - 1)
//currentMaxSeq 4998
if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
if currentMaxSeq < uint64(mongo2.GetSingleGocMsgNum()) {
remain = blk0 - currentMaxSeq //1
} else {
excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999
remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
remain = (uint64(mongo2.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(mongo2.GetSingleGocMsgNum()))) % uint64(mongo2.GetSingleGocMsgNum())
}
//remain=1
insertCounter := uint64(0)
msgListToMongo := make([]MsgInfo, 0)
msgListToMongoNext := make([]MsgInfo, 0)
msgListToMongo := make([]mongo2.MsgInfo, 0)
msgListToMongoNext := make([]mongo2.MsgInfo, 0)
seqUid := ""
seqUidNext := ""
log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
@ -42,7 +43,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
for _, m := range msgList {
log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
currentMaxSeq++
sMsg := MsgInfo{}
sMsg := mongo2.MsgInfo{}
sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq)
log.Debug(operationID, "mongo msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", userID, "seq: ", currentMaxSeq)
@ -51,24 +52,24 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
}
if isInit {
msgListToMongoNext = append(msgListToMongoNext, sMsg)
seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
seqUidNext = mongo2.getSeqUid(userID, uint32(currentMaxSeq))
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
continue
}
if insertCounter < remain {
msgListToMongo = append(msgListToMongo, sMsg)
insertCounter++
seqUid = getSeqUid(userID, uint32(currentMaxSeq))
seqUid = mongo2.getSeqUid(userID, uint32(currentMaxSeq))
log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} else {
msgListToMongoNext = append(msgListToMongoNext, sMsg)
seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
seqUidNext = mongo2.getSeqUid(userID, uint32(currentMaxSeq))
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
}
}
ctx := context.Background()
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(mongo2.cChat)
if seqUid != "" {
filter := bson.M{"uid": seqUid}
@ -77,7 +78,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
if err != nil {
if err == mongo.ErrNoDocuments {
filter := bson.M{"uid": seqUid}
sChat := UserChat{}
sChat := mongo2.UserChat{}
sChat.UID = seqUid
sChat.Msg = msgListToMongo
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
@ -98,7 +99,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
}
if seqUidNext != "" {
filter := bson.M{"uid": seqUidNext}
sChat := UserChat{}
sChat := mongo2.UserChat{}
sChat.UID = seqUidNext
sChat.Msg = msgListToMongoNext
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
@ -109,14 +110,14 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
}
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
}
log.Debug(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
return nil
}
func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) {
newTime := getCurrentTimestampByMill()
func (d *db.DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) {
newTime := mongo2.getCurrentTimestampByMill()
lenList := len(msgList)
if lenList > GetSingleGocMsgNum() {
if lenList > mongo2.GetSingleGocMsgNum() {
return errors.New("too large"), 0
}
if lenList < 1 {
@ -142,7 +143,7 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
for _, m := range msgList {
currentMaxSeq++
sMsg := MsgInfo{}
sMsg := mongo2.MsgInfo{}
sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq)
log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", insertID, "seq: ", currentMaxSeq)
@ -155,7 +156,7 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
} else {
promePkg.PromeInc(promePkg.MsgInsertRedisSuccessCounter)
}
log.Debug(operationID, "batch to redis cost time ", getCurrentTimestampByMill()-newTime, insertID, len(msgList))
log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, insertID, len(msgList))
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
err = d.SetGroupMaxSeq(insertID, currentMaxSeq)
} else {

View File

@ -1,7 +1,8 @@
package db
package mongo
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
@ -57,7 +58,7 @@ func SplitSourceIDAndGetIndex(sourceID string) int32 {
return int32(index)
}
func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error {
func (d *db.DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.InsertOne(ctx, set)
@ -68,7 +69,7 @@ type GetAllExtendMsgSetOpts struct {
ExcludeExtendMsgs bool
}
func (d *DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) {
func (d *db.DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
regex := fmt.Sprintf("^%s", ID)
@ -90,7 +91,7 @@ func (d *DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts)
return sets, nil
}
func (d *DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64, c *mongo.Collection) (*ExtendMsgSet, error) {
func (d *db.DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64, c *mongo.Collection) (*ExtendMsgSet, error) {
regex := fmt.Sprintf("^%s", sourceID)
var err error
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0})
@ -114,7 +115,7 @@ func (d *DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessio
}
// first modify msg
func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error {
func (d *db.DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, 0, c)
@ -141,7 +142,7 @@ func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *Ext
}
// insert or update
func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
func (d *db.DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
var updateBson = bson.M{}
@ -164,7 +165,7 @@ func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionT
}
// delete TypeKey
func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
func (d *db.DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
var updateBson = bson.M{}
@ -182,7 +183,7 @@ func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int3
return err
}
func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) {
func (d *db.DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1})

View File

@ -1,58 +1,19 @@
package db
package mongo
import (
"Open_IM/pkg/common/config"
"github.com/dtm-labs/rockscache"
"go.mongodb.org/mongo-driver/x/bsonx"
"strings"
//"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"fmt"
go_redis "github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo/options"
"gopkg.in/mgo.v2"
"time"
"context"
//"go.mongodb.org/mongo-driver/bson"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
// "go.mongodb.org/mongo-driver/mongo/options"
//go_redis "github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"log"
"strings"
"time"
)
var DB DataBases
type DataBases struct {
MysqlDB mysqlDB
mgoSession *mgo.Session
//redisPool *redis.Pool
mongoClient *mongo.Client
RDB go_redis.UniversalClient
Rc *rockscache.Client
WeakRc *rockscache.Client
}
type RedisClient struct {
client *go_redis.Client
cluster *go_redis.ClusterClient
go_redis.UniversalClient
enableCluster bool
}
func key(dbAddress, dbName string) string {
return dbAddress + "_" + dbName
}
func init() {
var mongoClient *mongo.Client
var err1 error
fmt.Println("init mysql redis mongo ")
initMysqlDB()
// mongo init
// "mongodb://sysop:moon@localhost/records"
func InitMongoClient() *mongo.Client {
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.DBUri != "" {
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
@ -67,7 +28,6 @@ func init() {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" {
// clientOpts := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018/?replicaSet=replset")
//mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
@ -81,13 +41,13 @@ func init() {
config.Config.Mongo.DBMaxPoolSize)
}
}
log.Println("start to init mongoDB:", uri)
mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err != nil {
time.Sleep(time.Duration(30) * time.Second)
mongoClient, err1 = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err1 != nil {
panic(err1.Error() + " mongo.Connect failed " + uri)
mongoClient, err = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err != nil {
panic(err.Error() + " mongo.Connect failed " + uri)
}
}
// mongodb create index
@ -95,7 +55,7 @@ func init() {
panic(err.Error() + " index create failed " + cSendLog + " send_id, -send_time")
}
if err := createMongoIndex(mongoClient, cChat, false, "uid"); err != nil {
fmt.Println(err.Error() + " index create failed " + cChat + " uid ")
fmt.Println(err.Error() + " index create failed " + cChat + " uid, please create index by yourself in field uid")
}
if err := createMongoIndex(mongoClient, cWorkMoment, true, "-create_time", "work_moment_id"); err != nil {
panic(err.Error() + "index create failed " + cWorkMoment + " -create_time, work_moment_id")
@ -112,44 +72,7 @@ func init() {
if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil {
panic(err.Error() + "index create failed " + cTag + " tag_id")
}
DB.mongoClient = mongoClient
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if config.Config.Redis.EnableCluster {
DB.RDB = go_redis.NewClusterClient(&go_redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
PoolSize: 50,
})
_, err = DB.RDB.Ping(ctx).Result()
if err != nil {
fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress)
panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
}
} else {
DB.RDB = go_redis.NewClient(&go_redis.Options{
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
_, err = DB.RDB.Ping(ctx).Result()
if err != nil {
panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
}
}
// 强一致性缓存当一个key被标记删除其他请求线程会被锁住轮询直到新的key生成适合各种同步的拉取, 如果弱一致可能导致拉取还是老数据,毫无意义
DB.Rc = rockscache.NewClient(DB.RDB, rockscache.NewDefaultOptions())
DB.Rc.Options.StrongConsistency = true
// 弱一致性缓存当一个key被标记删除其他请求线程直接返回该key的value适合高频并且生成很缓存很慢的情况 如大群发消息缓存的缓存
DB.WeakRc = rockscache.NewClient(DB.RDB, rockscache.NewDefaultOptions())
DB.WeakRc.Options.StrongConsistency = false
fmt.Println("init mysql redis mongo ok ")
return mongoClient
}
func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error {
@ -159,7 +82,7 @@ func createMongoIndex(client *mongo.Client, collection string, isUnique bool, ke
indexView := db.Indexes()
keysDoc := bsonx.Doc{}
// 复合索引
// create composite indexes
for _, key := range keys {
if strings.HasPrefix(key, "-") {
keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
@ -168,7 +91,7 @@ func createMongoIndex(client *mongo.Client, collection string, isUnique bool, ke
}
}
// 创建索引
// create index
index := mongo.IndexModel{
Keys: keysDoc,
}

View File

@ -1,8 +1,9 @@
package db
package mongo
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
@ -59,7 +60,7 @@ type GroupMember_x struct {
var ErrMsgListNotExist = errors.New("user not have msg in mongoDB")
func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) {
func (d *db.DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) {
return 1, nil
//var i, NB uint32
//var seqUid string
@ -89,12 +90,12 @@ func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) {
//return MinSeq, nil
}
func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) {
func (d *db.DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) {
return 1, nil
}
// deleteMsgByLogic
func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) {
func (d *db.DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) {
log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList)
sortkeys.Uint32s(seqList)
suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 {
@ -130,7 +131,7 @@ func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID
return totalUnexistSeqList, err
}
func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) ([]uint32, error) {
func (d *db.DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) ([]uint32, error) {
log.Debug(operationID, utils.GetSelfFuncName(), "args ", suffixUserID, seqList)
seqMsgList, indexList, unexistSeqList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID)
if err != nil {
@ -145,7 +146,7 @@ func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint3
}
// deleteMsgByLogic
func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error {
func (d *db.DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error {
sortkeys.Uint32s(seqList)
seqMsgs, err := d.GetMsgBySeqListMongo2(uid, seqList, operationID)
if err != nil {
@ -161,7 +162,7 @@ func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string
return nil
}
func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error {
func (d *db.DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error {
log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg)
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@ -182,7 +183,7 @@ func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgD
return nil
}
func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error {
func (d *db.DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error {
log.NewInfo(operationID, utils.GetSelfFuncName(), uid, *msg)
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@ -207,14 +208,14 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat
return nil
}
func (d *DataBases) UpdateOneMsgList(msg *UserChat) error {
func (d *db.DataBases) UpdateOneMsgList(msg *UserChat) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
_, err := c.UpdateOne(ctx, bson.M{"uid": msg.UID}, bson.M{"$set": bson.M{"msg": msg.Msg}})
return err
}
func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList)
var hasSeqList []uint32
singleCount := 0
@ -270,7 +271,7 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st
return seqMsg, nil
}
func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) {
func (d *db.DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@ -292,14 +293,14 @@ func (d *DataBases) GetUserMsgListByIndex(ID string, index int64) (*UserChat, er
}
}
func (d *DataBases) DelMongoMsgs(IDList []string) error {
func (d *db.DataBases) DelMongoMsgs(IDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
_, err := c.DeleteMany(ctx, bson.M{"uid": bson.M{"$in": IDList}})
return err
}
func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) {
func (d *db.DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
userChat := &UserChat{}
@ -327,7 +328,7 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replac
return replaceMaxSeq, err
}
func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@ -355,7 +356,7 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error
return nil, nil
}
func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@ -390,7 +391,7 @@ func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error
return nil, nil
}
func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
@ -441,7 +442,7 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio
}
return seqMsg, nil
}
func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
func (d *db.DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
@ -493,7 +494,7 @@ func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uin
return seqMsg, nil
}
func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {
func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
sChat := UserChat{}
@ -548,7 +549,7 @@ func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (e
return exceptionMsg
}
func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
func (d *db.DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
newTime := getCurrentTimestampByMill()
@ -620,7 +621,7 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD
// return nil
//}
func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
func (d *db.DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
var seqUid string
newTime := getCurrentTimestampByMill()
session := d.mgoSession.Clone()
@ -659,7 +660,7 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToD
return nil
}
func (d *DataBases) DelUserChat(uid string) error {
func (d *db.DataBases) DelUserChat(uid string) error {
return nil
//session := d.mgoSession.Clone()
//if session == nil {
@ -677,7 +678,7 @@ func (d *DataBases) DelUserChat(uid string) error {
//return nil
}
func (d *DataBases) DelUserChatMongo2(uid string) error {
func (d *db.DataBases) DelUserChatMongo2(uid string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
filter := bson.M{"uid": uid}
@ -689,7 +690,7 @@ func (d *DataBases) DelUserChatMongo2(uid string) error {
return nil
}
func (d *DataBases) MgoUserCount() (int, error) {
func (d *db.DataBases) MgoUserCount() (int, error) {
return 0, nil
//session := d.mgoSession.Clone()
//if session == nil {
@ -702,7 +703,7 @@ func (d *DataBases) MgoUserCount() (int, error) {
//return c.Find(nil).Count()
}
func (d *DataBases) MgoSkipUID(count int) (string, error) {
func (d *db.DataBases) MgoSkipUID(count int) (string, error) {
return "", nil
//session := d.mgoSession.Clone()
//if session == nil {
@ -717,7 +718,7 @@ func (d *DataBases) MgoSkipUID(count int) (string, error) {
//return sChat.UID, nil
}
func (d *DataBases) GetGroupMember(groupID string) []string {
func (d *db.DataBases) GetGroupMember(groupID string) []string {
return nil
//groupInfo := GroupMember_x{}
//groupInfo.GroupID = groupID
@ -738,7 +739,7 @@ func (d *DataBases) GetGroupMember(groupID string) []string {
//return groupInfo.UIDList
}
func (d *DataBases) AddGroupMember(groupID, uid string) error {
func (d *db.DataBases) AddGroupMember(groupID, uid string) error {
return nil
//session := d.mgoSession.Clone()
//if session == nil {
@ -771,7 +772,7 @@ func (d *DataBases) AddGroupMember(groupID, uid string) error {
//return nil
}
func (d *DataBases) DelGroupMember(groupID, uid string) error {
func (d *db.DataBases) DelGroupMember(groupID, uid string) error {
return nil
//session := d.mgoSession.Clone()
//if session == nil {
@ -795,7 +796,7 @@ type Tag struct {
UserList []string `bson:"user_list"`
}
func (d *DataBases) GetUserTags(userID string) ([]Tag, error) {
func (d *db.DataBases) GetUserTags(userID string) ([]Tag, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag)
var tags []Tag
@ -809,7 +810,7 @@ func (d *DataBases) GetUserTags(userID string) ([]Tag, error) {
return tags, nil
}
func (d *DataBases) CreateTag(userID, tagName string, userList []string) error {
func (d *db.DataBases) CreateTag(userID, tagName string, userList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag)
tagID := generateTagID(tagName, userID)
@ -823,7 +824,7 @@ func (d *DataBases) CreateTag(userID, tagName string, userList []string) error {
return err
}
func (d *DataBases) GetTagByID(userID, tagID string) (Tag, error) {
func (d *db.DataBases) GetTagByID(userID, tagID string) (Tag, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag)
var tag Tag
@ -831,14 +832,14 @@ func (d *DataBases) GetTagByID(userID, tagID string) (Tag, error) {
return tag, err
}
func (d *DataBases) DeleteTag(userID, tagID string) error {
func (d *db.DataBases) DeleteTag(userID, tagID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag)
_, err := c.DeleteOne(ctx, bson.M{"user_id": userID, "tag_id": tagID})
return err
}
func (d *DataBases) SetTag(userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error {
func (d *db.DataBases) SetTag(userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag)
var tag Tag
@ -873,7 +874,7 @@ func (d *DataBases) SetTag(userID, tagID, newName string, increaseUserIDList []s
return nil
}
func (d *DataBases) GetUserIDListByTagID(userID, tagID string) ([]string, error) {
func (d *db.DataBases) GetUserIDListByTagID(userID, tagID string) ([]string, error) {
var tag Tag
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cTag)
@ -894,14 +895,14 @@ type TagSendLog struct {
SendTime int64 `bson:"send_time"`
}
func (d *DataBases) SaveTagSendLog(tagSendLog *TagSendLog) error {
func (d *db.DataBases) SaveTagSendLog(tagSendLog *TagSendLog) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSendLog)
_, err := c.InsertOne(ctx, tagSendLog)
return err
}
func (d *DataBases) GetTagSendLogs(userID string, showNumber, pageNumber int32) ([]TagSendLog, error) {
func (d *db.DataBases) GetTagSendLogs(userID string, showNumber, pageNumber int32) ([]TagSendLog, error) {
var tagSendLogs []TagSendLog
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSendLog)
@ -947,7 +948,7 @@ type Comment struct {
CreateTime int32 `bson:"create_time" json:"create_time"`
}
func (d *DataBases) CreateOneWorkMoment(workMoment *WorkMoment) error {
func (d *db.DataBases) CreateOneWorkMoment(workMoment *WorkMoment) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
workMomentID := generateWorkMomentID(workMoment.UserID)
@ -957,14 +958,14 @@ func (d *DataBases) CreateOneWorkMoment(workMoment *WorkMoment) error {
return err
}
func (d *DataBases) DeleteOneWorkMoment(workMomentID string) error {
func (d *db.DataBases) DeleteOneWorkMoment(workMomentID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
_, err := c.DeleteOne(ctx, bson.M{"work_moment_id": workMomentID})
return err
}
func (d *DataBases) DeleteComment(workMomentID, contentID, opUserID string) error {
func (d *db.DataBases) DeleteComment(workMomentID, contentID, opUserID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
_, err := c.UpdateOne(ctx, bson.D{{"work_moment_id", workMomentID},
@ -976,7 +977,7 @@ func (d *DataBases) DeleteComment(workMomentID, contentID, opUserID string) erro
return err
}
func (d *DataBases) GetWorkMomentByID(workMomentID string) (*WorkMoment, error) {
func (d *db.DataBases) GetWorkMomentByID(workMomentID string) (*WorkMoment, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
workMoment := &WorkMoment{}
@ -984,7 +985,7 @@ func (d *DataBases) GetWorkMomentByID(workMomentID string) (*WorkMoment, error)
return workMoment, err
}
func (d *DataBases) LikeOneWorkMoment(likeUserID, userName, workMomentID string) (*WorkMoment, bool, error) {
func (d *db.DataBases) LikeOneWorkMoment(likeUserID, userName, workMomentID string) (*WorkMoment, bool, error) {
workMoment, err := d.GetWorkMomentByID(workMomentID)
if err != nil {
return nil, false, err
@ -1006,11 +1007,11 @@ func (d *DataBases) LikeOneWorkMoment(likeUserID, userName, workMomentID string)
return workMoment, !isAlreadyLike, err
}
func (d *DataBases) SetUserWorkMomentsLevel(userID string, level int32) error {
func (d *db.DataBases) SetUserWorkMomentsLevel(userID string, level int32) error {
return nil
}
func (d *DataBases) CommentOneWorkMoment(comment *Comment, workMomentID string) (WorkMoment, error) {
func (d *db.DataBases) CommentOneWorkMoment(comment *Comment, workMomentID string) (WorkMoment, error) {
comment.ContentID = generateWorkMomentCommentID(workMomentID)
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
@ -1019,7 +1020,7 @@ func (d *DataBases) CommentOneWorkMoment(comment *Comment, workMomentID string)
return workMoment, err
}
func (d *DataBases) GetUserSelfWorkMoments(userID string, showNumber, pageNumber int32) ([]WorkMoment, error) {
func (d *db.DataBases) GetUserSelfWorkMoments(userID string, showNumber, pageNumber int32) ([]WorkMoment, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
var workMomentList []WorkMoment
@ -1032,7 +1033,7 @@ func (d *DataBases) GetUserSelfWorkMoments(userID string, showNumber, pageNumber
return workMomentList, err
}
func (d *DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]WorkMoment, error) {
func (d *db.DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]WorkMoment, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
var workMomentList []WorkMoment
@ -1052,7 +1053,7 @@ func (d *DataBases) GetUserWorkMoments(opUserID, userID string, showNumber, page
return workMomentList, err
}
func (d *DataBases) GetUserFriendWorkMoments(showNumber, pageNumber int32, userID string, friendIDList []string) ([]WorkMoment, error) {
func (d *db.DataBases) GetUserFriendWorkMoments(showNumber, pageNumber int32, userID string, friendIDList []string) ([]WorkMoment, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cWorkMoment)
var workMomentList []WorkMoment
@ -1100,7 +1101,7 @@ type UserToSuperGroup struct {
GroupIDList []string `bson:"group_id_list" json:"groupIDList"`
}
func (d *DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error {
func (d *db.DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
@ -1145,7 +1146,7 @@ func (d *DataBases) CreateSuperGroup(groupID string, initMemberIDList []string,
return err
}
func (d *DataBases) GetSuperGroup(groupID string) (SuperGroup, error) {
func (d *db.DataBases) GetSuperGroup(groupID string) (SuperGroup, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
superGroup := SuperGroup{}
@ -1153,7 +1154,7 @@ func (d *DataBases) GetSuperGroup(groupID string) (SuperGroup, error) {
return superGroup, err
}
func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error {
func (d *db.DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
@ -1192,7 +1193,7 @@ func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) err
return err
}
func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error {
func (d *db.DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
@ -1215,7 +1216,7 @@ func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []strin
return err
}
func (d *DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) {
func (d *db.DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
var user UserToSuperGroup
@ -1223,7 +1224,7 @@ func (d *DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, erro
return user, nil
}
func (d *DataBases) DeleteSuperGroup(groupID string) error {
func (d *db.DataBases) DeleteSuperGroup(groupID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
@ -1247,7 +1248,7 @@ func (d *DataBases) DeleteSuperGroup(groupID string) error {
return nil
}
func (d *DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error {
func (d *db.DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error {
var users []UserToSuperGroup
for _, v := range userIDList {
users = append(users, UserToSuperGroup{
@ -1342,7 +1343,7 @@ func superGroupIndexGen(groupID string, seqSuffix uint32) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
func (d *DataBases) CleanUpUserMsgFromMongo(userID string, operationID string) error {
func (d *db.DataBases) CleanUpUserMsgFromMongo(userID string, operationID string) error {
ctx := context.Background()
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
maxSeq, err := d.GetUserMaxSeq(userID)

View File

@ -0,0 +1 @@
package mongo

View File

@ -1,157 +0,0 @@
package db
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/utils"
"fmt"
"time"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type mysqlDB struct {
//sync.RWMutex
db *gorm.DB
}
type Writer struct{}
func (w Writer) Printf(format string, args ...interface{}) {
fmt.Printf(format, args...)
}
func initMysqlDB() {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql")
var db *gorm.DB
var err1 error
db, err := gorm.Open(mysql.Open(dsn), nil)
if err != nil {
time.Sleep(time.Duration(30) * time.Second)
db, err1 = gorm.Open(mysql.Open(dsn), nil)
if err1 != nil {
panic(err1.Error() + " open failed " + dsn)
}
}
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8 COLLATE utf8_general_ci;", config.Config.Mysql.DBDatabaseName)
err = db.Exec(sql).Error
if err != nil {
panic(err.Error() + " Exec failed " + sql)
}
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName)
newLogger := logger.New(
Writer{},
logger.Config{
SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold
LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
})
if err != nil {
panic(err.Error() + " Open failed " + dsn)
}
sqlDB, err := db.DB()
if err != nil {
panic(err.Error() + " db.DB() failed ")
}
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime))
sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns)
sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns)
db.AutoMigrate(
&im_mysql_model.Friend{},
&im_mysql_model.FriendRequest{},
&im_mysql_model.Group{},
&im_mysql_model.GroupMember{},
&im_mysql_model.GroupRequest{},
&im_mysql_model.User{},
&im_mysql_model.Black{}, &im_mysql_model.ChatLog{}, &im_mysql_model.Conversation{}, &im_mysql_model.AppVersion{}, &im_mysql_model.BlackList{},
)
db.Set("gorm:table_options", "CHARSET=utf8")
db.Set("gorm:table_options", "collation=utf8_unicode_ci")
if !db.Migrator().HasTable(&im_mysql_model.Friend{}) {
db.Migrator().CreateTable(&im_mysql_model.Friend{})
}
if !db.Migrator().HasTable(&im_mysql_model.FriendRequest{}) {
db.Migrator().CreateTable(&im_mysql_model.FriendRequest{})
}
if !db.Migrator().HasTable(&im_mysql_model.Group{}) {
db.Migrator().CreateTable(&im_mysql_model.Group{})
}
if !db.Migrator().HasTable(&im_mysql_model.GroupMember{}) {
db.Migrator().CreateTable(&im_mysql_model.GroupMember{})
}
if !db.Migrator().HasTable(&im_mysql_model.GroupRequest{}) {
db.Migrator().CreateTable(&im_mysql_model.GroupRequest{})
}
if !db.Migrator().HasTable(&im_mysql_model.User{}) {
db.Migrator().CreateTable(&im_mysql_model.User{})
}
if !db.Migrator().HasTable(&im_mysql_model.Black{}) {
db.Migrator().CreateTable(&im_mysql_model.Black{})
}
if !db.Migrator().HasTable(&im_mysql_model.ChatLog{}) {
db.Migrator().CreateTable(&im_mysql_model.ChatLog{})
}
if !db.Migrator().HasTable(&im_mysql_model.Register{}) {
db.Migrator().CreateTable(&im_mysql_model.Register{})
}
if !db.Migrator().HasTable(&im_mysql_model.Conversation{}) {
db.Migrator().CreateTable(&im_mysql_model.Conversation{})
}
DB.MysqlDB.db = db
im_mysql_model.GroupDB = db.Table("groups")
im_mysql_model.GroupMemberDB = db.Table("group_members")
im_mysql_model.UserDB = db.Table("users")
im_mysql_model.ChatLogDB = db.Table("chat_logs")
im_mysql_model.BlackListDB = db.Table("black_lists")
im_mysql_model.BlackDB = db.Table("blacks")
im_mysql_model.AppDB = db.Table("app_version")
im_mysql_model.BlackDB = db.Table("blacks")
im_mysql_model.ConversationDB = db.Table("conversations")
im_mysql_model.FriendDB = db.Table("friends")
im_mysql_model.FriendRequestDB = db.Table("friend_requests")
im_mysql_model.GroupRequestDB = db.Table("group_requests")
InitManager()
}
func InitManager() {
for k, v := range config.Config.Manager.AppManagerUid {
_, err := im_mysql_model.GetUserByUserID(v)
if err != nil {
} else {
continue
}
var appMgr im_mysql_model.User
appMgr.UserID = v
if k == 0 {
appMgr.Nickname = config.Config.Manager.AppSysNotificationName
} else {
appMgr.Nickname = "AppManager" + utils.IntToString(k+1)
}
appMgr.AppMangerLevel = constant.AppAdmin
err = im_mysql_model.UserRegister(appMgr)
if err != nil {
fmt.Println("AppManager insert error ", err.Error(), appMgr)
} else {
fmt.Println("AppManager insert ", appMgr)
}
}
}
func (m *mysqlDB) DefaultGormDB() *gorm.DB {
return DB.MysqlDB.db
}

View File

@ -1,8 +1,15 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
pbMsg "Open_IM/pkg/proto/msg"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/copier"
"gorm.io/gorm"
"time"
)
@ -31,6 +38,34 @@ func (ChatLog) TableName() string {
return "chat_logs"
}
func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error {
chatLog := new(ChatLog)
copier.Copy(chatLog, msg.MsgData)
switch msg.MsgData.SessionType {
case constant.GroupChatType, constant.SuperGroupChatType:
chatLog.RecvID = msg.MsgData.GroupID
case constant.SingleChatType:
chatLog.RecvID = msg.MsgData.RecvID
}
if msg.MsgData.ContentType >= constant.NotificationBegin && msg.MsgData.ContentType <= constant.NotificationEnd {
var tips server_api_params.TipsComm
_ = proto.Unmarshal(msg.MsgData.Content, &tips)
marshaler := jsonpb.Marshaler{
OrigName: true,
EnumsAsInts: false,
EmitDefaults: false,
}
chatLog.Content, _ = marshaler.MarshalToString(&tips)
} else {
chatLog.Content = string(msg.MsgData.Content)
}
chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime)
chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime)
log.NewDebug("test", "this is ", chatLog)
return db.DB.MysqlDB.DefaultGormDB().Table("chat_logs").Create(chatLog).Error
}
func GetChatLog(chatLog *ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []ChatLog, error) {
mdb := ChatLogDB.Table("chat_logs")
if chatLog.SendTime.Unix() > 0 {

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"gorm.io/gorm"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"gorm.io/gorm"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"gorm.io/gorm"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"fmt"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/trace_log"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/trace_log"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
//type GroupMember struct {
// GroupID string `gorm:"column:group_id;primaryKey;"`

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/constant"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/constant"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/trace_log"
@ -8,8 +8,6 @@ import (
"time"
)
var GroupDB *gorm.DB
type Group struct {
GroupID string `gorm:"column:group_id;primary_key;size:64" json:"groupID" binding:"required"`
GroupName string `gorm:"column:name;size:255" json:"groupName"`
@ -26,6 +24,13 @@ type Group struct {
ApplyMemberFriend int32 `gorm:"column:apply_member_friend" json:"applyMemberFriend"`
NotificationUpdateTime time.Time `gorm:"column:notification_update_time"`
NotificationUserID string `gorm:"column:notification_user_id;size:64"`
DB *gorm.DB
}
func NewGroupDB() *Group {
var group Group
group.DB = initMysqlDB(&group)
return &group
}
func (*Group) Create(ctx context.Context, groups []*Group) (err error) {

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
//
//func UpdateGroupRequest(groupRequest GroupRequest) error {

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/trace_log"

View File

@ -0,0 +1,67 @@
package mysql
import (
"Open_IM/pkg/common/config"
"fmt"
"time"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func initMysqlDB(model interface{}) *gorm.DB {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql")
var db *gorm.DB
db, err := gorm.Open(mysql.Open(dsn), nil)
if err != nil {
time.Sleep(time.Duration(30) * time.Second)
db, err = gorm.Open(mysql.Open(dsn), nil)
if err != nil {
panic(err.Error() + " open failed " + dsn)
}
}
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8 COLLATE utf8_general_ci;", config.Config.Mysql.DBDatabaseName)
err = db.Exec(sql).Error
if err != nil {
panic(err.Error() + " Exec failed:" + sql)
}
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName)
newLogger := logger.New(
Writer{},
logger.Config{
SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold
LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
})
if err != nil {
panic(err.Error() + " Open failed " + dsn)
}
sqlDB, err := db.DB()
if err != nil {
panic(err.Error() + " db.DB() failed ")
}
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime))
sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns)
sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns)
//models := []interface{}{&Friend{}, &FriendRequest{}, &Group{}, &GroupMember{}, &GroupRequest{},
// &User{}, &Black{}, &ChatLog{}, &Conversation{}, &AppVersion{}}
db.AutoMigrate(model)
db.Set("gorm:table_options", "CHARSET=utf8")
db.Set("gorm:table_options", "collation=utf8_unicode_ci")
_ = db.Migrator().CreateTable(model)
return db.Model(model)
}
type Writer struct{}
func (w Writer) Printf(format string, args ...interface{}) {
fmt.Printf(format, args...)
}

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
//type Register struct {
// Account string `gorm:"column:account;primary_key;type:char(255)" json:"account"`

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/constant"

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/trace_log"

View File

@ -1,9 +1,9 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/utils"
"errors"
"fmt"
"gorm.io/gorm"
"time"
@ -14,10 +14,28 @@ var (
UserDB *gorm.DB
)
type BlackList struct {
UserId string `gorm:"column:uid"`
BeginDisableTime time.Time `gorm:"column:begin_disable_time"`
EndDisableTime time.Time `gorm:"column:end_disable_time"`
func InitManager() {
for k, v := range config.Config.Manager.AppManagerUid {
_, err := GetUserByUserID(v)
if err != nil {
} else {
continue
}
var appMgr User
appMgr.UserID = v
if k == 0 {
appMgr.Nickname = config.Config.Manager.AppSysNotificationName
} else {
appMgr.Nickname = "AppManager" + utils.IntToString(k+1)
}
appMgr.AppMangerLevel = constant.AppAdmin
err = UserRegister(appMgr)
if err != nil {
fmt.Println("AppManager insert error ", err.Error(), appMgr)
} else {
fmt.Println("AppManager insert ", appMgr)
}
}
}
func UserRegister(user User) error {
@ -130,110 +148,17 @@ func AddUser(userID string, phoneNumber string, name string, email string, gende
return result.Error
}
func UserIsBlock(userId string) (bool, error) {
var user BlackList
rows := BlackListDB.Where("uid=?", userId).First(&user).RowsAffected
if rows >= 1 {
return user.EndDisableTime.After(time.Now()), nil
}
return false, nil
}
func UsersIsBlock(userIDList []string) (inBlockUserIDList []string, err error) {
err = BlackListDB.Where("uid in (?) and end_disable_time > now()", userIDList).Pluck("uid", &inBlockUserIDList).Error
return inBlockUserIDList, err
}
func BlockUser(userID, endDisableTime string) error {
user, err := GetUserByUserID(userID)
if err != nil || user.UserID == "" {
return err
}
end, err := time.Parse("2006-01-02 15:04:05", endDisableTime)
if err != nil {
return err
}
if end.Before(time.Now()) {
return errors.New("endDisableTime is before now")
}
var blockUser BlackList
BlackListDB.Where("uid=?", userID).First(&blockUser)
if blockUser.UserId != "" {
BlackListDB.Where("uid=?", blockUser.UserId).Update("end_disable_time", end)
return nil
}
blockUser = BlackList{
UserId: userID,
BeginDisableTime: time.Now(),
EndDisableTime: end,
}
err = BlackListDB.Create(&blockUser).Error
return err
}
func UnBlockUser(userID string) error {
return BlackListDB.Where("uid=?", userID).Delete(&BlackList{}).Error
}
type BlockUserInfo struct {
User User
BeginDisableTime time.Time
EndDisableTime time.Time
}
func GetBlockUserByID(userId string) (BlockUserInfo, error) {
var blockUserInfo BlockUserInfo
blockUser := BlackList{
UserId: userId,
}
if err := BlackListDB.Table("black_lists").Where("uid=?", userId).Find(&blockUser).Error; err != nil {
return blockUserInfo, err
}
user := User{
UserID: blockUser.UserId,
}
if err := BlackListDB.Find(&user).Error; err != nil {
return blockUserInfo, err
}
blockUserInfo.User.UserID = user.UserID
blockUserInfo.User.FaceURL = user.FaceURL
blockUserInfo.User.Nickname = user.Nickname
blockUserInfo.User.Birth = user.Birth
blockUserInfo.User.PhoneNumber = user.PhoneNumber
blockUserInfo.User.Email = user.Email
blockUserInfo.User.Gender = user.Gender
blockUserInfo.BeginDisableTime = blockUser.BeginDisableTime
blockUserInfo.EndDisableTime = blockUser.EndDisableTime
return blockUserInfo, nil
}
func GetBlockUsers(showNumber, pageNumber int32) ([]BlockUserInfo, error) {
var blockUserInfos []BlockUserInfo
var blockUsers []BlackList
if err := BlackListDB.Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&blockUsers).Error; err != nil {
return blockUserInfos, err
}
for _, blockUser := range blockUsers {
var user User
if err := UserDB.Table("users").Where("user_id=?", blockUser.UserId).First(&user).Error; err == nil {
blockUserInfos = append(blockUserInfos, BlockUserInfo{
User: User{
UserID: user.UserID,
Nickname: user.Nickname,
FaceURL: user.FaceURL,
Birth: user.Birth,
PhoneNumber: user.PhoneNumber,
Email: user.Email,
Gender: user.Gender,
},
BeginDisableTime: blockUser.BeginDisableTime,
EndDisableTime: blockUser.EndDisableTime,
})
}
}
return blockUserInfos, nil
}
func GetUserByName(userName string, showNumber, pageNumber int32) ([]User, error) {
var users []User
err := UserDB.Where(" name like ?", fmt.Sprintf("%%%s%%", userName)).Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&users).Error

View File

@ -1,4 +1,4 @@
package im_mysql_model
package mysql
import (
"Open_IM/pkg/common/trace_log"

View File

@ -1,48 +0,0 @@
/*
** description("").
** copyright('tuoyun,www.tuoyun.net').
** author("fg,Gordon@tuoyun.net").
** time(2021/3/4 11:18).
*/
package im_mysql_msg_model
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/copier"
)
func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error {
chatLog := new(im_mysql_model.ChatLog)
copier.Copy(chatLog, msg.MsgData)
switch msg.MsgData.SessionType {
case constant.GroupChatType, constant.SuperGroupChatType:
chatLog.RecvID = msg.MsgData.GroupID
case constant.SingleChatType:
chatLog.RecvID = msg.MsgData.RecvID
}
if msg.MsgData.ContentType >= constant.NotificationBegin && msg.MsgData.ContentType <= constant.NotificationEnd {
var tips server_api_params.TipsComm
_ = proto.Unmarshal(msg.MsgData.Content, &tips)
marshaler := jsonpb.Marshaler{
OrigName: true,
EnumsAsInts: false,
EmitDefaults: false,
}
chatLog.Content, _ = marshaler.MarshalToString(&tips)
} else {
chatLog.Content = string(msg.MsgData.Content)
}
chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime)
chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime)
log.NewDebug("test", "this is ", chatLog)
return db.DB.MysqlDB.DefaultGormDB().Table("chat_logs").Create(chatLog).Error
}

View File

@ -1,16 +0,0 @@
package im_mysql_msg_model
import (
"Open_IM/pkg/common/config"
"hash/crc32"
)
func getHashMsgDBAddr(userID string) string {
hCode := crc32.ChecksumIEEE([]byte(userID))
return config.Config.Mysql.DBAddress[hCode%uint32(len(config.Config.Mysql.DBAddress))]
}
func getHashMsgTableIndex(userID string) int {
hCode := crc32.ChecksumIEEE([]byte(userID))
return int(hCode % uint32(config.Config.Mysql.DBMsgTableNum))
}