From c9f326c72232bec8c74f019133f31db156f1b80d Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 26 Oct 2021 10:59:59 +0800 Subject: [PATCH 1/3] seq pull change --- src/common/db/redisModel.go | 18 +++++++++++++++-- src/msg_gateway/gate/logic.go | 11 ++++++----- src/rpc/chat/chat/pull_message.go | 33 +++++++++++++++++++------------ 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/src/common/db/redisModel.go b/src/common/db/redisModel.go index f86dbb638..385165687 100644 --- a/src/common/db/redisModel.go +++ b/src/common/db/redisModel.go @@ -9,6 +9,7 @@ const ( userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq appleDeviceToken = "DEVICE_TOKEN" lastGetSeq = "LAST_GET_SEQ" + userMinSeq = "REDIS_USER_Min_SEQ:" ) func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) { @@ -37,12 +38,25 @@ func (d *DataBases) IncrUserSeq(uid string) (int64, error) { return redis.Int64(d.Exec("INCR", key)) } -//获取最新的seq -func (d *DataBases) GetUserSeq(uid string) (int64, error) { +//获取最大的Seq +func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) { key := userIncrSeq + uid return redis.Int64(d.Exec("GET", key)) } +//设置用户最小的seq +func (d *DataBases) SetUserMinSeq(uid string) (err error) { + key := userMinSeq + uid + _, err = d.Exec("SET", key) + return err +} + +//获取最小的Seq +func (d *DataBases) GetUserMinSeq(uid string) (int64, error) { + key := userMinSeq + uid + return redis.Int64(d.Exec("GET", key)) +} + //存储苹果的设备token到redis func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) { key := appleDeviceToken + accountAddress diff --git a/src/msg_gateway/gate/logic.go b/src/msg_gateway/gate/logic.go index 9ce7db745..0d1a2b5e9 100644 --- a/src/msg_gateway/gate/logic.go +++ b/src/msg_gateway/gate/logic.go @@ -68,9 +68,10 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { log.NewInfo("", "goroutine num is ", runtime.NumGoroutine()) } -func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) { - var mReplyData pbWs.GetNewSeqResp - mReplyData.Seq = pb.GetSeq() +func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) { + var mReplyData pbWs.GetMaxAndMinSeqResp + mReplyData.MaxSeq = pb.GetMaxSeq() + mReplyData.MinSeq = pb.GetMinSeq() b, _ := proto.Marshal(&mReplyData) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, @@ -84,7 +85,7 @@ func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqRes } func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) { log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m) - pbData := pbChat.GetNewSeqReq{} + pbData := pbChat.GetMaxAndMinSeqReq{} pbData.UserID = m.SendID pbData.OperationID = m.OperationID grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) @@ -92,7 +93,7 @@ func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) { log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m) } msgClient := pbChat.NewChatClient(grpcConn) - reply, err := msgClient.GetNewSeq(context.Background(), &pbData) + reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData) if err != nil { log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String()) return diff --git a/src/rpc/chat/chat/pull_message.go b/src/rpc/chat/chat/pull_message.go index 359b3709f..884e1dd9a 100644 --- a/src/rpc/chat/chat/pull_message.go +++ b/src/rpc/chat/chat/pull_message.go @@ -13,22 +13,30 @@ import ( pbMsg "Open_IM/src/proto/chat" ) -func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) { - log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String()) +func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) { + log.InfoByKv("rpc getMaxAndMinSeq is arriving", in.OperationID, in.String()) //seq, err := model.GetBiggestSeqFromReceive(in.UserID) - seq, err := commonDB.DB.GetUserSeq(in.UserID) - resp := new(pbMsg.GetNewSeqResp) - if err == nil { - resp.Seq = seq + maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID) + minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID) + resp := new(pbMsg.GetMaxAndMinSeqResp) + if err1 == nil && err2 == nil { + resp.MaxSeq = maxSeq + resp.MinSeq = minSeq resp.ErrCode = 0 resp.ErrMsg = "" - return resp, err + return resp, nil } else { - if err == redis.ErrNil { - resp.Seq = 0 - } else { - log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error()) - resp.Seq = -1 + if err1 == redis.ErrNil { + resp.MaxSeq = 0 + } else if err1 != nil { + log.NewInfo(in.OperationID, "getMaxSeq from redis error", in.String(), err1.Error()) + resp.MaxSeq = -1 + } + if err2 == redis.ErrNil { + resp.MinSeq = 0 + } else if err2 != nil { + log.NewInfo(in.OperationID, "getMinSeq from redis error", in.String(), err2.Error()) + resp.MinSeq = -1 } resp.ErrCode = 0 resp.ErrMsg = "" @@ -81,7 +89,6 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessag SingleUserMsg: respSingleMsgFormat, GroupUserMsg: respGroupMsgFormat, }, nil - panic("implement me") } func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat { var userid string From 4ce0c6be50b77fb5023845d07af434c6dcbbaf43 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 26 Oct 2021 11:08:03 +0800 Subject: [PATCH 2/3] api change --- src/api/chat/{newest_seq.go => get_max_min_seq.go} | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) rename src/api/chat/{newest_seq.go => get_max_min_seq.go} (90%) diff --git a/src/api/chat/newest_seq.go b/src/api/chat/get_max_min_seq.go similarity index 90% rename from src/api/chat/newest_seq.go rename to src/api/chat/get_max_min_seq.go index 9fa7e1836..73d98b4ef 100644 --- a/src/api/chat/newest_seq.go +++ b/src/api/chat/get_max_min_seq.go @@ -31,7 +31,7 @@ func UserNewestSeq(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"}) return } - pbData := pbMsg.GetNewSeqReq{} + pbData := pbMsg.GetMaxAndMinSeqReq{} pbData.UserID = params.SendID pbData.OperationID = params.OperationID grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) @@ -40,7 +40,7 @@ func UserNewestSeq(c *gin.Context) { log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", params) } msgClient := pbMsg.NewChatClient(grpcConn) - reply, err := msgClient.GetNewSeq(context.Background(), &pbData) + reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData) if err != nil { log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String()) return @@ -52,7 +52,8 @@ func UserNewestSeq(c *gin.Context) { "msgIncr": params.MsgIncr, "reqIdentifier": params.ReqIdentifier, "data": gin.H{ - "seq": reply.Seq, + "maxSeq": reply.MaxSeq, + "minSeq": reply.MinSeq, }, }) From bd31d4ff9cd085338bd834cd441a8fe47738697b Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 26 Oct 2021 12:02:07 +0800 Subject: [PATCH 3/3] shell modify --- script/check_all.sh | 38 ++++++++++++++++++++++++------------ script/msg_gateway_start.sh | 2 +- script/msg_transfer_start.sh | 8 +------- script/push_start.sh | 2 +- script/sdk_svr_start.sh | 2 +- 5 files changed, 29 insertions(+), 23 deletions(-) diff --git a/script/check_all.sh b/script/check_all.sh index dd8042fde..3c57577c4 100644 --- a/script/check_all.sh +++ b/script/check_all.sh @@ -15,17 +15,29 @@ service_port_name=( openImWsPort openImSdkWsPort ) - for i in ${service_port_name[*]};do - list=$(cat $config_path | grep -w ${i} | awk -F '[:]' '{print $NF}') - list_to_string $list - for j in ${ports_array};do - port=`netstat -netulp |grep ./open_im| awk '{print $4}'|grep -w ${j}|awk -F '[:]' '{print $NF}'` - if [[ ${port} -ne ${j} ]]; then - echo -e ${YELLOW_PREFIX}${i}${COLOR_SUFFIX}${RED_PREFIX}" service does not start normally,not initiated port is "${COLOR_SUFFIX}${YELLOW_PREFIX}${j}${COLOR_SUFFIX} - exit 1 - else - echo -e ${j}${GREEN_PREFIX}" port has been listening,belongs service is "${i}${COLOR_SUFFIX} - fi - done +for i in ${service_port_name[*]}; do + list=$(cat $config_path | grep -w ${i} | awk -F '[:]' '{print $NF}') + list_to_string $list + for j in ${ports_array}; do + port=$(netstat -netulp | grep ./open_im | awk '{print $4}' | grep -w ${j} | awk -F '[:]' '{print $NF}') + if [[ ${port} -ne ${j} ]]; then + echo -e ${YELLOW_PREFIX}${i}${COLOR_SUFFIX}${RED_PREFIX}" service does not start normally,not initiated port is "${COLOR_SUFFIX}${YELLOW_PREFIX}${j}${COLOR_SUFFIX} + echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} + exit -1 + else + echo -e ${j}${GREEN_PREFIX}" port has been listening,belongs service is "${i}${COLOR_SUFFIX} + fi done - echo -e ${YELLOW_PREFIX}"all services launch success"${COLOR_SUFFIX} +done + +#Check launched service process +check=$(ps aux | grep -w ./${msg_transfer_name} | grep -v grep | wc -l) +if [ $check -eq ${msg_transfer_service_num} ]; then + echo -e ${GREEN_PREFIX}"service has been starting,belongs service is openImMsgTransfer"${COLOR_SUFFIX} +else + echo -e ${RED_PREFIX}"openImMsgTransfer service does not start normally, num err"${COLOR_SUFFIX} + echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} + exit -1 +fi + +echo -e ${YELLOW_PREFIX}"all services launch success"${COLOR_SUFFIX} diff --git a/script/msg_gateway_start.sh b/script/msg_gateway_start.sh index 197c28743..1df53cfdd 100644 --- a/script/msg_gateway_start.sh +++ b/script/msg_gateway_start.sh @@ -44,5 +44,5 @@ if [ $check -ge 1 ]; then echo -e ${SKY_BLUE_PREFIX}"PID: "${COLOR_SUFFIX}${YELLOW_PREFIX}${allNewPid}${COLOR_SUFFIX} echo -e ${SKY_BLUE_PREFIX}"LISTENING_PORT: "${COLOR_SUFFIX}${YELLOW_PREFIX}${allPorts}${COLOR_SUFFIX} else - echo -e ${YELLOW_PREFIX}${msg_gateway_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR !!! PLEASE CHECK ERROR LOG"${COLOR_SUFFIX} + echo -e ${YELLOW_PREFIX}${msg_gateway_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR, PLEASE CHECK openIM.log"${COLOR_SUFFIX} fi diff --git a/script/msg_transfer_start.sh b/script/msg_transfer_start.sh index 9caeed9f2..4d7fb96cd 100644 --- a/script/msg_transfer_start.sh +++ b/script/msg_transfer_start.sh @@ -26,17 +26,11 @@ check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l` if [ $check -ge 1 ] then newPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'` -ports=`netstat -netulp | grep -w ${newPid}|awk '{print $4}'|awk -F '[:]' '{print $NF}'` allPorts="" - -for i in $ports ; -do - allPorts=${allPorts}"$i " -done echo -e ${SKY_BLUE_PREFIX}"SERVICE START SUCCESS "${COLOR_SUFFIX} echo -e ${SKY_BLUE_PREFIX}"SERVICE_NAME: "${COLOR_SUFFIX}${YELLOW_PREFIX}${msg_transfer_name}${COLOR_SUFFIX} echo -e ${SKY_BLUE_PREFIX}"PID: "${COLOR_SUFFIX}${YELLOW_PREFIX}${newPid}${COLOR_SUFFIX} echo -e ${SKY_BLUE_PREFIX}"LISTENING_PORT: "${COLOR_SUFFIX}${YELLOW_PREFIX}${allPorts}${COLOR_SUFFIX} else - echo -e ${YELLOW_PREFIX}${msg_transfer_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR !!! PLEASE CHECK ERROR LOG"${COLOR_SUFFIX} + echo -e ${YELLOW_PREFIX}${msg_transfer_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR, PLEASE CHECK openIM.log"${COLOR_SUFFIX} fi diff --git a/script/push_start.sh b/script/push_start.sh index 9c7812dab..8b2a47896 100644 --- a/script/push_start.sh +++ b/script/push_start.sh @@ -41,5 +41,5 @@ if [ $check -ge 1 ]; then echo -e ${SKY_BLUE_PREFIX}"PID: "${COLOR_SUFFIX}${YELLOW_PREFIX}${newPid}${COLOR_SUFFIX} echo -e ${SKY_BLUE_PREFIX}"LISTENING_PORT: "${COLOR_SUFFIX}${YELLOW_PREFIX}${allPorts}${COLOR_SUFFIX} else - echo -e ${YELLOW_PREFIX}${push_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR !!! PLEASE CHECK ERROR LOG"${COLOR_SUFFIX} + echo -e ${YELLOW_PREFIX}${push_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR, PLEASE CHECK openIM.log"${COLOR_SUFFIX} fi diff --git a/script/sdk_svr_start.sh b/script/sdk_svr_start.sh index 28d764b73..97c694233 100644 --- a/script/sdk_svr_start.sh +++ b/script/sdk_svr_start.sh @@ -43,5 +43,5 @@ if [ $check -ge 1 ]; then echo -e ${SKY_BLUE_PREFIX}"PID: "${COLOR_SUFFIX}${YELLOW_PREFIX}${allNewPid}${COLOR_SUFFIX} echo -e ${SKY_BLUE_PREFIX}"LISTENING_PORT: "${COLOR_SUFFIX}${YELLOW_PREFIX}${allPorts}${COLOR_SUFFIX} else - echo -e ${YELLOW_PREFIX}${sdk_server_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR !!! PLEASE CHECK ERROR LOG"${COLOR_SUFFIX} + echo -e ${YELLOW_PREFIX}${sdk_server_name}${COLOR_SUFFIX}${RED_PREFIX}"SERVICE START ERROR PLEASE CHECK openIM.log"${COLOR_SUFFIX} fi