diff --git a/pkg/common/db/model.go b/pkg/common/db/model.go index f8aaca54a..6e2c9614a 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/model.go @@ -8,8 +8,8 @@ import ( //"Open_IM/pkg/common/log" "Open_IM/pkg/utils" "fmt" + go_redis "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/mongo/options" - // "context" // "fmt" "github.com/garyburd/redigo/redis" @@ -30,6 +30,7 @@ type DataBases struct { mgoSession *mgo.Session redisPool *redis.Pool mongoClient *mongo.Client + rdb *go_redis.ClusterClient } func key(dbAddress, dbName string) string { @@ -113,6 +114,18 @@ func init() { ) }, } + + DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ + Addrs: []string{config.Config.Redis.DBAddress}, + PoolSize: 100, + Password: config.Config.Redis.DBPassWord, + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = DB.rdb.Ping(ctx).Result() + if err != nil { + panic(err.Error()) + } } func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error { diff --git a/pkg/common/db/newRedisModel.go b/pkg/common/db/newRedisModel.go new file mode 100644 index 000000000..f2b5f8930 --- /dev/null +++ b/pkg/common/db/newRedisModel.go @@ -0,0 +1,44 @@ +package db + +import ( + "Open_IM/pkg/common/config" + log2 "Open_IM/pkg/common/log" + pbChat "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" + "context" + "errors" + "fmt" + "strconv" + "time" +) + +//func (d * DataBases)pubMessage(channel, msg string) { +// d.rdb.Publish(context.Background(),channel,msg) +//} +//func (d * DataBases)pubMessage(channel, msg string) { +// d.rdb.Publish(context.Background(),channel,msg) +//} + +func (d *DataBases) NewSetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { + ctx := context.Background() + var failedList []pbChat.MsgDataToMQ + for _, msg := range msgList { + key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) + s, err := utils.Pb2Map(msg.MsgData) + if err != nil { + log2.NewWarn(operationID, utils.GetSelfFuncName(), "Pb2Map failed", msg.MsgData.String(), uid, err.Error()) + continue + } + log2.NewDebug(operationID, "convert map is ", s) + val, err := d.rdb.HMSet(ctx, key, s).Result() + if err != nil { + log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, val) + failedList = append(failedList, *msg) + } + d.rdb.Expire(ctx, key, time.Second*time.Duration(config.Config.MsgCacheTimeout)) + } + if len(failedList) != 0 { + return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %s", failedList)) + } + return nil +} diff --git a/pkg/common/db/redisModel_test.go b/pkg/common/db/redisModel_test.go index 1d9d254b7..f6f940209 100644 --- a/pkg/common/db/redisModel_test.go +++ b/pkg/common/db/redisModel_test.go @@ -1,6 +1,9 @@ package db import ( + pbChat "Open_IM/pkg/proto/chat" + "context" + "flag" "fmt" "github.com/stretchr/testify/assert" "testing" @@ -25,3 +28,29 @@ func TestDataBases_GetMultiConversationMsgOpt(t *testing.T) { assert.Nil(t, err) fmt.Println(m) } +func Test_GetKeyTTL(t *testing.T) { + ctx := context.Background() + key := flag.String("key", "key", "key value") + flag.Parse() + ttl, err := DB.rdb.TTL(ctx, *key).Result() + assert.Nil(t, err) + fmt.Println(ttl) +} +func Test_HGetAll(t *testing.T) { + ctx := context.Background() + key := flag.String("key", "key", "key value") + flag.Parse() + ttl, err := DB.rdb.TTL(ctx, *key).Result() + assert.Nil(t, err) + fmt.Println(ttl) +} +func Test_NewSetMessageToCache(t *testing.T) { + var msg pbChat.MsgDataToMQ + uid := "test_uid" + msg.MsgData.Seq = 11 + msg.MsgData.ClientMsgID = "23jwhjsdf" + messageList := []*pbChat.MsgDataToMQ{&msg} + err := DB.NewSetMessageToCache(messageList, uid, "test") + assert.Nil(t, err) + +}