cache rpc

This commit is contained in:
wangchuxiao 2022-04-25 20:05:21 +08:00
parent 51146a4ed5
commit a03f89fb68
10 changed files with 340 additions and 30 deletions

View File

@ -0,0 +1,23 @@
.PHONY: all build run gotool install clean help
BINARY_NAME=open_im_office
BIN_DIR=../../../bin/
all: gotool build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s"
run:
@go run ./
gotool:
go fmt ./
go vet ./
install:
make build
mv ${BINARY_NAME} ${BIN_DIR}
clean:
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi

View File

@ -0,0 +1,15 @@
package main
import (
rpc "Open_IM/internal/rpc/cache"
"flag"
"fmt"
)
func main() {
rpcPort := flag.Int("port", 11500, "rpc listening port")
flag.Parse()
fmt.Println("start office rpc server, port: ", *rpcPort)
rpcServer := rpc.NewOfficeServer(*rpcPort)
rpcServer.Run()
}

View File

@ -148,6 +148,7 @@ rpcregistername: #rpc注册服务名默认即可
openImOfficeName: Office
openImOrganizationName: Organization
openImConversationName: Conversation
openImCacheName: Cache
log:
storageLocation: ../logs/

View File

@ -7,6 +7,7 @@ import (
"context"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/policy"
url2 "net/url"
)
@ -58,10 +59,10 @@ func MinioInit() {
}
}
// 自动化桶public的代码
//err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite)
//if err != nil {
// log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in web", err.Error())
// return
//}
err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite)
if err != nil {
log.NewDebug("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in web", err.Error())
return
}
log.NewInfo(operationID, utils.GetSelfFuncName(), "minio create and set policy success")
}

View File

@ -8,6 +8,7 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/cache"
pbRelay "Open_IM/pkg/proto/relay"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
rpc "Open_IM/pkg/proto/user"
@ -25,34 +26,100 @@ func GetUsersInfo(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": http.StatusBadRequest, "errMsg": err.Error()})
return
}
req := &rpc.GetUserInfoReq{}
utils.CopyStructFields(req, &params)
getUserInfoReq := &rpc.GetUserInfoReq{}
getUserInfoReq.OperationID = params.OperationID
var ok bool
ok, req.OpUserID = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
ok, getUserInfoReq.OpUserID = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), getUserInfoReq.OperationID)
if !ok {
log.NewError(req.OperationID, "GetUserIDFromToken false ", c.Request.Header.Get("token"))
log.NewError(getUserInfoReq.OperationID, "GetUserIDFromToken false ", c.Request.Header.Get("token"))
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"})
return
}
log.NewInfo(params.OperationID, "GetUserInfo args ", req.String())
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
client := rpc.NewUserClient(etcdConn)
RpcResp, err := client.GetUserInfo(context.Background(), req)
log.NewInfo(params.OperationID, "GetUserInfo args ", getUserInfoReq.String())
reqCacheGetUserInfo := &cache.GetUserInfoReq{}
utils.CopyStructFields(reqCacheGetUserInfo, &params)
var userInfoList []*open_im_sdk.UserInfo
var publicUserInfoList []*open_im_sdk.PublicUserInfo
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
cacheClient := cache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.GetUserInfo(context.Background(), reqCacheGetUserInfo)
if err != nil {
log.NewError(req.OperationID, "GetUserInfo failed ", err.Error(), req.String())
log.NewError(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "GetUserInfo failed", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed: " + err.Error()})
return
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "GetUserInfo failed", cacheResp.CommonResp)
resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}
resp.Data = []map[string]interface{}{}
log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
return
}
log.NewInfo(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "cacheResp:", cacheResp.String())
userInfoList = cacheResp.UserInfoList
var needCacheUserIDList []string
for _, userID := range reqCacheGetUserInfo.UserIDList {
isGetUserInfoFromCache := false
for _, cacheUser := range userInfoList {
if cacheUser.UserID == userID {
isGetUserInfoFromCache = true
}
}
if !isGetUserInfoFromCache {
needCacheUserIDList = append(needCacheUserIDList, userID)
}
}
if len(needCacheUserIDList) == 0 {
log.NewInfo(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "get all userInfo from cache success")
for _, v := range userInfoList {
publicUserInfoList = append(publicUserInfoList,
&open_im_sdk.PublicUserInfo{UserID: v.UserID, Nickname: v.Nickname, FaceURL: v.FaceURL, Gender: v.Gender, Ex: v.Ex})
}
resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}, UserInfoList: publicUserInfoList}
resp.Data = jsonData.JsonDataList(resp.UserInfoList)
log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
return
}
log.NewDebug(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "need cache user list", needCacheUserIDList)
getUserInfoReq.UserIDList = needCacheUserIDList
etcdConn = getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
client := rpc.NewUserClient(etcdConn)
rpcResp, err := client.GetUserInfo(context.Background(), getUserInfoReq)
if err != nil {
log.NewError(getUserInfoReq.OperationID, "GetUserInfo failed ", err.Error(), getUserInfoReq.String())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed" + err.Error()})
return
}
if rpcResp.CommonResp.ErrCode != 0 {
log.NewError(getUserInfoReq.OperationID, utils.GetSelfFuncName(), "GetUserInfo failed", cacheResp.CommonResp)
resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}
resp.Data = []map[string]interface{}{}
log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
return
}
userInfoList = append(userInfoList, rpcResp.UserInfoList...)
cacheUpdateUserInfoReq := &cache.UpdateUserInfoReq{
UserInfoList: rpcResp.UserInfoList,
OperationID: getUserInfoReq.OperationID,
}
_, err = cacheClient.UpdateUserInfo(context.Background(), cacheUpdateUserInfoReq)
if err != nil {
log.NewError(getUserInfoReq.OperationID, "GetUserInfo failed ", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed"})
return
}
var publicUserInfoList []*open_im_sdk.PublicUserInfo
for _, v := range RpcResp.UserInfoList {
userInfoList = rpcResp.UserInfoList
for _, v := range userInfoList {
publicUserInfoList = append(publicUserInfoList,
&open_im_sdk.PublicUserInfo{UserID: v.UserID, Nickname: v.Nickname, FaceURL: v.FaceURL, Gender: v.Gender, Ex: v.Ex})
}
resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}, UserInfoList: publicUserInfoList}
resp := api.GetUsersInfoResp{CommResp: api.CommResp{ErrCode: rpcResp.CommonResp.ErrCode, ErrMsg: rpcResp.CommonResp.ErrMsg}, UserInfoList: publicUserInfoList}
resp.Data = jsonData.JsonDataList(resp.UserInfoList)
log.NewInfo(req.OperationID, "GetUserInfo api return ", resp)
log.NewInfo(getUserInfoReq.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
}
@ -75,18 +142,34 @@ func UpdateUserInfo(c *gin.Context) {
return
}
log.NewInfo(params.OperationID, "UpdateUserInfo args ", req.String())
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
client := rpc.NewUserClient(etcdConn)
RpcResp, err := client.UpdateUserInfo(context.Background(), req)
etcdConnUser := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
client := rpc.NewUserClient(etcdConnUser)
rpcResp, err := client.UpdateUserInfo(context.Background(), req)
if err != nil {
log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), req.String())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed"})
return
}
resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}}
log.NewInfo(req.OperationID, "UpdateUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
if rpcResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, utils.GetSelfFuncName(), rpcResp.CommonResp.String())
resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: rpcResp.CommonResp.ErrCode, ErrMsg: rpcResp.CommonResp.ErrMsg}}
c.JSON(http.StatusOK, resp)
return
} else {
updateUserInfoReq := &cache.UpdateUserInfoReq{UserInfoList: []*open_im_sdk.UserInfo{req.UserInfo}}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), updateUserInfoReq.String())
etcdConnCache := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
cacheClient := cache.NewCacheClient(etcdConnCache)
cacheResp, err := cacheClient.UpdateUserInfo(context.Background(), updateUserInfoReq)
if err != nil {
log.NewError(req.OperationID, "UpdateUserInfo cache failed ", err.Error(), req.String())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed: " + err.Error()})
return
}
resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}
log.NewInfo(req.OperationID, "UpdateUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
}
}
func GetSelfUserInfo(c *gin.Context) {
@ -118,6 +201,22 @@ func GetSelfUserInfo(c *gin.Context) {
return
}
if len(RpcResp.UserInfoList) == 1 {
updateUserInfoReq := &cache.UpdateUserInfoReq{UserInfoList: []*open_im_sdk.UserInfo{RpcResp.UserInfoList[0]}}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), updateUserInfoReq.String())
etcdConnCache := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
cacheClient := cache.NewCacheClient(etcdConnCache)
cacheResp, err := cacheClient.UpdateUserInfo(context.Background(), updateUserInfoReq)
if err != nil {
log.NewError(req.OperationID, "UpdateUserInfo cache failed ", err.Error(), req.String())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed: " + err.Error()})
return
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, utils.GetSelfFuncName(), cacheResp.CommonResp.String())
resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}
c.JSON(http.StatusOK, resp)
return
}
resp := api.GetSelfUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}, UserInfo: RpcResp.UserInfoList[0]}
resp.Data = jsonData.JsonDataOne(resp.UserInfo)
log.NewInfo(req.OperationID, "GetUserInfo api return ", resp)
@ -127,7 +226,6 @@ func GetSelfUserInfo(c *gin.Context) {
log.NewInfo(req.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
}
}
func GetUsersOnlineStatus(c *gin.Context) {

103
internal/rpc/cache/cache.go vendored Normal file
View File

@ -0,0 +1,103 @@
package cache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbCache "Open_IM/pkg/proto/cache"
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"google.golang.org/grpc"
"net"
"strconv"
"strings"
)
type cacheServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
}
func NewOfficeServer(port int) *cacheServer {
log.NewPrivateLog(constant.LogFileName)
return &cacheServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
}
}
func (s *cacheServer) Run() {
log.NewInfo("0", "cacheServer rpc start ")
ip := utils.ServerIP
registerAddress := ip + ":" + strconv.Itoa(s.rpcPort)
//listener network
listener, err := net.Listen("tcp", registerAddress)
if err != nil {
log.NewError("0", "Listen failed ", err.Error(), registerAddress)
return
}
log.NewInfo("0", "listen network success, ", registerAddress, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
defer srv.GracefulStop()
pbCache.RegisterCacheServer(srv, s)
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error())
return
}
err = srv.Serve(listener)
if err != nil {
log.NewError("0", "Serve failed ", err.Error())
return
}
log.NewInfo("0", "message cms rpc success")
}
func (s *cacheServer) GetUserInfo(_ context.Context, req *pbCache.GetUserInfoReq) (resp *pbCache.GetUserInfoResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.GetUserInfoResp{
UserInfoList: []*commonPb.UserInfo{},
CommonResp: &pbCache.CommonResp{},
}
for _, userID := range req.UserIDList {
userInfo, err := db.DB.GetUserInfo(userID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "get userInfo from cache failed", err.Error())
continue
}
resp.UserInfoList = append(resp.UserInfoList, userInfo)
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) UpdateUserInfo(_ context.Context, req *pbCache.UpdateUserInfoReq) (resp *pbCache.UpdateUserInfoResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.UpdateUserInfoResp{
CommonResp: &pbCache.CommonResp{},
}
for _, userInfo := range req.UserInfoList {
if err := db.DB.SetUserInfo(userInfo); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "set userInfo to cache failed", err.Error())
return resp, nil
}
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) UpdateAllUserToCache(_ context.Context, req *pbCache.UpdateAllUserToCacheReq) (resp *pbCache.UpdateAllUserToCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.UpdateAllUserToCacheResp{CommonResp: &pbCache.CommonResp{}}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}

View File

@ -508,14 +508,15 @@ func (s *officeServer) GetUserFriendWorkMoments(_ context.Context, req *pbOffice
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbOffice.GetUserFriendWorkMomentsResp{CommonResp: &pbOffice.CommonResp{}, WorkMoments: []*pbOffice.WorkMoment{}}
resp.Pagination = &pbCommon.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber}
//resp.WorkMoments = make([]*pbOffice.WorkMoment, 0)
friendIDList, err := imdb.GetFriendIDListByUserID(req.UserID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetFriendIDListByUserID", err.Error())
resp.CommonResp = &pbOffice.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "friendIDList: ", friendIDList)
for _, friendID := range friendIDList {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), *friendID)
}
workMoments, err := db.DB.GetUserFriendWorkMoments(friendIDList, req.Pagination.ShowNumber, req.Pagination.PageNumber, req.UserID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserFriendWorkMoments", err.Error())

View File

@ -119,6 +119,7 @@ type config struct {
OpenImOfficeName string `yaml:"openImOfficeName"`
OpenImOrganizationName string `yaml:"openImOrganizationName"`
OpenImConversationName string `yaml:"openImConversationName"`
OpenImCacheName string `yaml:"openImCacheName"`
}
Etcd struct {
EtcdSchema string `yaml:"etcdSchema"`

View File

@ -3,6 +3,8 @@ package db
import (
"Open_IM/pkg/common/constant"
log2 "Open_IM/pkg/common/log"
pbCommon "Open_IM/pkg/proto/sdk_ws"
"encoding/json"
"github.com/garyburd/redigo/redis"
)
@ -15,6 +17,7 @@ const (
uidPidToken = "UID_PID_TOKEN_STATUS:"
conversationReceiveMessageOpt = "CON_RECV_MSG_OPT:"
GetuiToken = "GETUI"
UserInfoCache = "USER_INFO_CACHE:"
)
func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
@ -155,3 +158,23 @@ func (d *DataBases) GetGetuiToken() (string, error) {
result, err := redis.String(d.Exec("GET", GetuiToken))
return result, err
}
func (d *DataBases) SetUserInfo(userInfo *pbCommon.UserInfo) error {
b, _ := json.Marshal(&userInfo)
m := map[string]interface{}{}
if err := json.Unmarshal(b, &m); err != nil {
return err
}
_, err := d.Exec("hmset", UserInfoCache+userInfo.UserID, redis.Args{}.Add().AddFlat(m)...)
return err
}
func (d *DataBases) GetUserInfo(userID string) (*pbCommon.UserInfo, error) {
result, err := redis.String(d.Exec("HGETALL", UserInfoCache+userID))
if err != nil {
return nil, err
}
userInfo := &pbCommon.UserInfo{}
err = json.Unmarshal([]byte(result), userInfo)
return userInfo, err
}

44
pkg/proto/cache/cache.proto vendored Normal file
View File

@ -0,0 +1,44 @@
syntax = "proto3";
import "Open_IM/pkg/proto/sdk_ws/ws.proto";
option go_package = "./cache;cache";
package cache;
message CommonResp{
int32 errCode = 1;
string errMsg = 2;
}
message GetUserInfoReq{
repeated string userIDList = 1;
string operationID = 3;
}
message GetUserInfoResp{
CommonResp commonResp = 1;
repeated server_api_params.UserInfo UserInfoList = 2;
}
message UpdateUserInfoReq{
repeated server_api_params.UserInfo UserInfoList = 1;
string operationID = 2;
}
message UpdateUserInfoResp{
CommonResp commonResp = 1;
}
message UpdateAllUserToCacheReq{
string operationID = 1;
}
message UpdateAllUserToCacheResp{
CommonResp commonResp = 1;
}
service cache{
rpc GetUserInfo(GetUserInfoReq) returns(GetUserInfoResp);
rpc UpdateUserInfo(UpdateUserInfoReq) returns(UpdateUserInfoResp);
rpc UpdateAllUserToCache(UpdateAllUserToCacheReq) returns(UpdateAllUserToCacheResp);
}