From a03f89fb68e6313aaeed4432e5c11ce3b0ba85ce Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 25 Apr 2022 20:05:21 +0800 Subject: [PATCH] cache rpc --- cmd/rpc/open_im_rpc/Makefile | 23 +++++ cmd/rpc/open_im_rpc/main.go | 15 ++++ config/config.yaml | 1 + internal/api/third/minio_init.go | 11 +-- internal/api/user/user.go | 144 ++++++++++++++++++++++++++----- internal/rpc/cache/cache.go | 103 ++++++++++++++++++++++ internal/rpc/office/office.go | 5 +- pkg/common/config/config.go | 1 + pkg/common/db/redisModel.go | 23 +++++ pkg/proto/cache/cache.proto | 44 ++++++++++ 10 files changed, 340 insertions(+), 30 deletions(-) create mode 100644 cmd/rpc/open_im_rpc/Makefile create mode 100644 cmd/rpc/open_im_rpc/main.go create mode 100644 internal/rpc/cache/cache.go create mode 100644 pkg/proto/cache/cache.proto diff --git a/cmd/rpc/open_im_rpc/Makefile b/cmd/rpc/open_im_rpc/Makefile new file mode 100644 index 000000000..b86230c64 --- /dev/null +++ b/cmd/rpc/open_im_rpc/Makefile @@ -0,0 +1,23 @@ +.PHONY: all build run gotool install clean help + +BINARY_NAME=open_im_office +BIN_DIR=../../../bin/ + +all: gotool build + +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" + +run: + @go run ./ + +gotool: + go fmt ./ + go vet ./ + +install: + make build + mv ${BINARY_NAME} ${BIN_DIR} + +clean: + @if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi diff --git a/cmd/rpc/open_im_rpc/main.go b/cmd/rpc/open_im_rpc/main.go new file mode 100644 index 000000000..efe887c1e --- /dev/null +++ b/cmd/rpc/open_im_rpc/main.go @@ -0,0 +1,15 @@ +package main + +import ( + rpc "Open_IM/internal/rpc/cache" + "flag" + "fmt" +) + +func main() { + rpcPort := flag.Int("port", 11500, "rpc listening port") + flag.Parse() + fmt.Println("start office rpc server, port: ", *rpcPort) + rpcServer := rpc.NewOfficeServer(*rpcPort) + rpcServer.Run() +} diff --git a/config/config.yaml b/config/config.yaml index 285dd0434..4bb653511 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -148,6 +148,7 @@ rpcregistername: #rpc注册服务名,默认即可 openImOfficeName: Office openImOrganizationName: Organization openImConversationName: Conversation + openImCacheName: Cache log: storageLocation: ../logs/ diff --git a/internal/api/third/minio_init.go b/internal/api/third/minio_init.go index b3d66b934..2c00965bf 100644 --- a/internal/api/third/minio_init.go +++ b/internal/api/third/minio_init.go @@ -7,6 +7,7 @@ import ( "context" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/policy" url2 "net/url" ) @@ -58,10 +59,10 @@ func MinioInit() { } } // 自动化桶public的代码 - //err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite) - //if err != nil { - // log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in web", err.Error()) - // return - //} + err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite) + if err != nil { + log.NewDebug("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in web", err.Error()) + return + } log.NewInfo(operationID, utils.GetSelfFuncName(), "minio create and set policy success") } diff --git a/internal/api/user/user.go b/internal/api/user/user.go index 4fbbcf5ef..e56689897 100644 --- a/internal/api/user/user.go +++ b/internal/api/user/user.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" + "Open_IM/pkg/proto/cache" pbRelay "Open_IM/pkg/proto/relay" open_im_sdk "Open_IM/pkg/proto/sdk_ws" rpc "Open_IM/pkg/proto/user" @@ -25,34 +26,100 @@ func GetUsersInfo(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": http.StatusBadRequest, "errMsg": err.Error()}) return } - req := &rpc.GetUserInfoReq{} - utils.CopyStructFields(req, ¶ms) + getUserInfoReq := &rpc.GetUserInfoReq{} + getUserInfoReq.OperationID = params.OperationID var ok bool - ok, req.OpUserID = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID) + ok, getUserInfoReq.OpUserID = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), getUserInfoReq.OperationID) if !ok { - log.NewError(req.OperationID, "GetUserIDFromToken false ", c.Request.Header.Get("token")) + log.NewError(getUserInfoReq.OperationID, "GetUserIDFromToken false ", c.Request.Header.Get("token")) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"}) return } - log.NewInfo(params.OperationID, "GetUserInfo args ", req.String()) - - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName) - client := rpc.NewUserClient(etcdConn) - RpcResp, err := client.GetUserInfo(context.Background(), req) + log.NewInfo(params.OperationID, "GetUserInfo args ", getUserInfoReq.String()) + reqCacheGetUserInfo := &cache.GetUserInfoReq{} + utils.CopyStructFields(reqCacheGetUserInfo, ¶ms) + var userInfoList []*open_im_sdk.UserInfo + var publicUserInfoList []*open_im_sdk.PublicUserInfo + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) + cacheClient := cache.NewCacheClient(etcdConn) + cacheResp, err := cacheClient.GetUserInfo(context.Background(), reqCacheGetUserInfo) if err != nil { - log.NewError(req.OperationID, "GetUserInfo failed ", err.Error(), req.String()) + log.NewError(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "GetUserInfo failed", err.Error()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed: " + err.Error()}) + return + } + if cacheResp.CommonResp.ErrCode != 0 { + log.NewError(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "GetUserInfo failed", cacheResp.CommonResp) + resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}} + resp.Data = []map[string]interface{}{} + log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp) + c.JSON(http.StatusOK, resp) + return + } + log.NewInfo(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "cacheResp:", cacheResp.String()) + userInfoList = cacheResp.UserInfoList + var needCacheUserIDList []string + for _, userID := range reqCacheGetUserInfo.UserIDList { + isGetUserInfoFromCache := false + for _, cacheUser := range userInfoList { + if cacheUser.UserID == userID { + isGetUserInfoFromCache = true + } + } + if !isGetUserInfoFromCache { + needCacheUserIDList = append(needCacheUserIDList, userID) + } + } + if len(needCacheUserIDList) == 0 { + log.NewInfo(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "get all userInfo from cache success") + for _, v := range userInfoList { + publicUserInfoList = append(publicUserInfoList, + &open_im_sdk.PublicUserInfo{UserID: v.UserID, Nickname: v.Nickname, FaceURL: v.FaceURL, Gender: v.Gender, Ex: v.Ex}) + } + resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}, UserInfoList: publicUserInfoList} + resp.Data = jsonData.JsonDataList(resp.UserInfoList) + log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp) + c.JSON(http.StatusOK, resp) + return + } + + log.NewDebug(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "need cache user list", needCacheUserIDList) + getUserInfoReq.UserIDList = needCacheUserIDList + etcdConn = getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName) + client := rpc.NewUserClient(etcdConn) + rpcResp, err := client.GetUserInfo(context.Background(), getUserInfoReq) + if err != nil { + log.NewError(getUserInfoReq.OperationID, "GetUserInfo failed ", err.Error(), getUserInfoReq.String()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed" + err.Error()}) + return + } + if rpcResp.CommonResp.ErrCode != 0 { + log.NewError(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "GetUserInfo failed", cacheResp.CommonResp) + resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}} + resp.Data = []map[string]interface{}{} + log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp) + c.JSON(http.StatusOK, resp) + return + } + userInfoList = append(userInfoList, rpcResp.UserInfoList...) + cacheUpdateUserInfoReq := &cache.UpdateUserInfoReq{ + UserInfoList: rpcResp.UserInfoList, + OperationID: getUserInfoReq.OperationID, + } + _, err = cacheClient.UpdateUserInfo(context.Background(), cacheUpdateUserInfoReq) + if err != nil { + log.NewError(getUserInfoReq.OperationID, "GetUserInfo failed ", err.Error()) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed"}) return } - var publicUserInfoList []*open_im_sdk.PublicUserInfo - for _, v := range RpcResp.UserInfoList { + userInfoList = rpcResp.UserInfoList + for _, v := range userInfoList { publicUserInfoList = append(publicUserInfoList, &open_im_sdk.PublicUserInfo{UserID: v.UserID, Nickname: v.Nickname, FaceURL: v.FaceURL, Gender: v.Gender, Ex: v.Ex}) } - - resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}, UserInfoList: publicUserInfoList} + resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: rpcResp.CommonResp.ErrCode, ErrMsg: rpcResp.CommonResp.ErrMsg}, UserInfoList: publicUserInfoList} resp.Data = jsonData.JsonDataList(resp.UserInfoList) - log.NewInfo(req.OperationID, "GetUserInfo api return ", resp) + log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp) c.JSON(http.StatusOK, resp) } @@ -75,18 +142,34 @@ func UpdateUserInfo(c *gin.Context) { return } log.NewInfo(params.OperationID, "UpdateUserInfo args ", req.String()) - - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName) - client := rpc.NewUserClient(etcdConn) - RpcResp, err := client.UpdateUserInfo(context.Background(), req) + etcdConnUser := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName) + client := rpc.NewUserClient(etcdConnUser) + rpcResp, err := client.UpdateUserInfo(context.Background(), req) if err != nil { log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), req.String()) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed"}) return } - resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}} - log.NewInfo(req.OperationID, "UpdateUserInfo api return ", resp) - c.JSON(http.StatusOK, resp) + if rpcResp.CommonResp.ErrCode != 0 { + log.NewError(req.OperationID, utils.GetSelfFuncName(), rpcResp.CommonResp.String()) + resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: rpcResp.CommonResp.ErrCode, ErrMsg: rpcResp.CommonResp.ErrMsg}} + c.JSON(http.StatusOK, resp) + return + } else { + updateUserInfoReq := &cache.UpdateUserInfoReq{UserInfoList: []*open_im_sdk.UserInfo{req.UserInfo}} + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), updateUserInfoReq.String()) + etcdConnCache := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) + cacheClient := cache.NewCacheClient(etcdConnCache) + cacheResp, err := cacheClient.UpdateUserInfo(context.Background(), updateUserInfoReq) + if err != nil { + log.NewError(req.OperationID, "UpdateUserInfo cache failed ", err.Error(), req.String()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed: " + err.Error()}) + return + } + resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}} + log.NewInfo(req.OperationID, "UpdateUserInfo api return ", resp) + c.JSON(http.StatusOK, resp) + } } func GetSelfUserInfo(c *gin.Context) { @@ -118,6 +201,22 @@ func GetSelfUserInfo(c *gin.Context) { return } if len(RpcResp.UserInfoList) == 1 { + updateUserInfoReq := &cache.UpdateUserInfoReq{UserInfoList: []*open_im_sdk.UserInfo{RpcResp.UserInfoList[0]}} + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), updateUserInfoReq.String()) + etcdConnCache := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) + cacheClient := cache.NewCacheClient(etcdConnCache) + cacheResp, err := cacheClient.UpdateUserInfo(context.Background(), updateUserInfoReq) + if err != nil { + log.NewError(req.OperationID, "UpdateUserInfo cache failed ", err.Error(), req.String()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed: " + err.Error()}) + return + } + if cacheResp.CommonResp.ErrCode != 0 { + log.NewError(req.OperationID, utils.GetSelfFuncName(), cacheResp.CommonResp.String()) + resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}} + c.JSON(http.StatusOK, resp) + return + } resp := api.GetSelfUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}, UserInfo: RpcResp.UserInfoList[0]} resp.Data = jsonData.JsonDataOne(resp.UserInfo) log.NewInfo(req.OperationID, "GetUserInfo api return ", resp) @@ -127,7 +226,6 @@ func GetSelfUserInfo(c *gin.Context) { log.NewInfo(req.OperationID, "GetUserInfo api return ", resp) c.JSON(http.StatusOK, resp) } - } func GetUsersOnlineStatus(c *gin.Context) { diff --git a/internal/rpc/cache/cache.go b/internal/rpc/cache/cache.go new file mode 100644 index 000000000..e172ed723 --- /dev/null +++ b/internal/rpc/cache/cache.go @@ -0,0 +1,103 @@ +package cache + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" + "Open_IM/pkg/common/log" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbCache "Open_IM/pkg/proto/cache" + commonPb "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + "context" + "google.golang.org/grpc" + "net" + "strconv" + "strings" +) + +type cacheServer struct { + rpcPort int + rpcRegisterName string + etcdSchema string + etcdAddr []string +} + +func NewOfficeServer(port int) *cacheServer { + log.NewPrivateLog(constant.LogFileName) + return &cacheServer{ + rpcPort: port, + rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName, + etcdSchema: config.Config.Etcd.EtcdSchema, + etcdAddr: config.Config.Etcd.EtcdAddr, + } +} + +func (s *cacheServer) Run() { + log.NewInfo("0", "cacheServer rpc start ") + ip := utils.ServerIP + registerAddress := ip + ":" + strconv.Itoa(s.rpcPort) + //listener network + listener, err := net.Listen("tcp", registerAddress) + if err != nil { + log.NewError("0", "Listen failed ", err.Error(), registerAddress) + return + } + log.NewInfo("0", "listen network success, ", registerAddress, listener) + defer listener.Close() + //grpc server + srv := grpc.NewServer() + defer srv.GracefulStop() + pbCache.RegisterCacheServer(srv, s) + err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName, 10) + if err != nil { + log.NewError("0", "RegisterEtcd failed ", err.Error()) + return + } + err = srv.Serve(listener) + if err != nil { + log.NewError("0", "Serve failed ", err.Error()) + return + } + log.NewInfo("0", "message cms rpc success") +} + +func (s *cacheServer) GetUserInfo(_ context.Context, req *pbCache.GetUserInfoReq) (resp *pbCache.GetUserInfoResp, err error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) + resp = &pbCache.GetUserInfoResp{ + UserInfoList: []*commonPb.UserInfo{}, + CommonResp: &pbCache.CommonResp{}, + } + for _, userID := range req.UserIDList { + userInfo, err := db.DB.GetUserInfo(userID) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "get userInfo from cache failed", err.Error()) + continue + } + resp.UserInfoList = append(resp.UserInfoList, userInfo) + } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) + return resp, nil +} + +func (s *cacheServer) UpdateUserInfo(_ context.Context, req *pbCache.UpdateUserInfoReq) (resp *pbCache.UpdateUserInfoResp, err error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) + resp = &pbCache.UpdateUserInfoResp{ + CommonResp: &pbCache.CommonResp{}, + } + for _, userInfo := range req.UserInfoList { + if err := db.DB.SetUserInfo(userInfo); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "set userInfo to cache failed", err.Error()) + return resp, nil + } + } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) + return resp, nil +} + +func (s *cacheServer) UpdateAllUserToCache(_ context.Context, req *pbCache.UpdateAllUserToCacheReq) (resp *pbCache.UpdateAllUserToCacheResp, err error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) + resp = &pbCache.UpdateAllUserToCacheResp{CommonResp: &pbCache.CommonResp{}} + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) + return resp, nil +} diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index dffaa872a..3fd9fb6fe 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -508,14 +508,15 @@ func (s *officeServer) GetUserFriendWorkMoments(_ context.Context, req *pbOffice log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp = &pbOffice.GetUserFriendWorkMomentsResp{CommonResp: &pbOffice.CommonResp{}, WorkMoments: []*pbOffice.WorkMoment{}} resp.Pagination = &pbCommon.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber} - //resp.WorkMoments = make([]*pbOffice.WorkMoment, 0) friendIDList, err := imdb.GetFriendIDListByUserID(req.UserID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetFriendIDListByUserID", err.Error()) resp.CommonResp = &pbOffice.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil } - log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "friendIDList: ", friendIDList) + for _, friendID := range friendIDList { + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), *friendID) + } workMoments, err := db.DB.GetUserFriendWorkMoments(friendIDList, req.Pagination.ShowNumber, req.Pagination.PageNumber, req.UserID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserFriendWorkMoments", err.Error()) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 49e0e0849..bd4998335 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -119,6 +119,7 @@ type config struct { OpenImOfficeName string `yaml:"openImOfficeName"` OpenImOrganizationName string `yaml:"openImOrganizationName"` OpenImConversationName string `yaml:"openImConversationName"` + OpenImCacheName string `yaml:"openImCacheName"` } Etcd struct { EtcdSchema string `yaml:"etcdSchema"` diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index 2faa9a6d8..9b32b18d7 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -3,6 +3,8 @@ package db import ( "Open_IM/pkg/common/constant" log2 "Open_IM/pkg/common/log" + pbCommon "Open_IM/pkg/proto/sdk_ws" + "encoding/json" "github.com/garyburd/redigo/redis" ) @@ -15,6 +17,7 @@ const ( uidPidToken = "UID_PID_TOKEN_STATUS:" conversationReceiveMessageOpt = "CON_RECV_MSG_OPT:" GetuiToken = "GETUI" + UserInfoCache = "USER_INFO_CACHE:" ) func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) { @@ -155,3 +158,23 @@ func (d *DataBases) GetGetuiToken() (string, error) { result, err := redis.String(d.Exec("GET", GetuiToken)) return result, err } + +func (d *DataBases) SetUserInfo(userInfo *pbCommon.UserInfo) error { + b, _ := json.Marshal(&userInfo) + m := map[string]interface{}{} + if err := json.Unmarshal(b, &m); err != nil { + return err + } + _, err := d.Exec("hmset", UserInfoCache+userInfo.UserID, redis.Args{}.Add().AddFlat(m)...) + return err +} + +func (d *DataBases) GetUserInfo(userID string) (*pbCommon.UserInfo, error) { + result, err := redis.String(d.Exec("HGETALL", UserInfoCache+userID)) + if err != nil { + return nil, err + } + userInfo := &pbCommon.UserInfo{} + err = json.Unmarshal([]byte(result), userInfo) + return userInfo, err +} diff --git a/pkg/proto/cache/cache.proto b/pkg/proto/cache/cache.proto new file mode 100644 index 000000000..cecfabd3c --- /dev/null +++ b/pkg/proto/cache/cache.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; +import "Open_IM/pkg/proto/sdk_ws/ws.proto"; +option go_package = "./cache;cache"; +package cache; + +message CommonResp{ + int32 errCode = 1; + string errMsg = 2; +} + +message GetUserInfoReq{ + repeated string userIDList = 1; + string operationID = 3; +} + +message GetUserInfoResp{ + CommonResp commonResp = 1; + repeated server_api_params.UserInfo UserInfoList = 2; +} + + +message UpdateUserInfoReq{ + repeated server_api_params.UserInfo UserInfoList = 1; + string operationID = 2; +} + +message UpdateUserInfoResp{ + CommonResp commonResp = 1; +} + +message UpdateAllUserToCacheReq{ + string operationID = 1; +} + +message UpdateAllUserToCacheResp{ + CommonResp commonResp = 1; +} + + +service cache{ + rpc GetUserInfo(GetUserInfoReq) returns(GetUserInfoResp); + rpc UpdateUserInfo(UpdateUserInfoReq) returns(UpdateUserInfoResp); + rpc UpdateAllUserToCache(UpdateAllUserToCacheReq) returns(UpdateAllUserToCacheResp); +}