From b36c041d35ad94db4c0186005bf93b0d77b94d11 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 6 Aug 2021 14:56:41 +0800 Subject: [PATCH] message update --- src/common/log/logrus.go | 4 +-- src/msg_gateway/open_im_msg_gateway.go | 2 +- src/msg_transfer/logic/history_msg_handler.go | 16 +++++++--- src/proto/chat/chat.proto | 4 +-- src/rpc/chat/chat/send_msg.go | 31 +++++++++++++------ src/utils/time_format.go | 5 +++ 6 files changed, 43 insertions(+), 19 deletions(-) diff --git a/src/common/log/logrus.go b/src/common/log/logrus.go index 09301ab24..48c781c68 100644 --- a/src/common/log/logrus.go +++ b/src/common/log/logrus.go @@ -32,7 +32,7 @@ func loggerInit(moduleName string) *Logger { logger.SetLevel(logrus.TraceLevel) //Log Style Setting logger.SetFormatter(&nested.Formatter{ - TimestampFormat: "2006-01-02 15:04:05", + TimestampFormat: "2006-01-02 15:04:05.000", HideKeys: false, FieldsOrder: []string{"PID"}, }) @@ -58,7 +58,7 @@ func NewLfsHook(rotationTime time.Duration, maxRemainNum uint, moduleName string logrus.WarnLevel: initRotateLogs(rotationTime, maxRemainNum, "warn", moduleName), logrus.ErrorLevel: initRotateLogs(rotationTime, maxRemainNum, "error", moduleName), }, &nested.Formatter{ - TimestampFormat: "2006-01-02 15:04:05", + TimestampFormat: "2006-01-02 15:04:05.000", HideKeys: false, FieldsOrder: []string{"PID"}, }) diff --git a/src/msg_gateway/open_im_msg_gateway.go b/src/msg_gateway/open_im_msg_gateway.go index b125d0ce4..364a7568d 100644 --- a/src/msg_gateway/open_im_msg_gateway.go +++ b/src/msg_gateway/open_im_msg_gateway.go @@ -8,7 +8,7 @@ import ( func main() { rpcPort := flag.Int("rpc_port", 10500, "rpc listening port") - wsPort := flag.Int("ws_port", 10800, "rpc listening port") + wsPort := flag.Int("ws_port", 10800, "ws listening port") flag.Parse() var wg sync.WaitGroup wg.Add(1) diff --git a/src/msg_transfer/logic/history_msg_handler.go b/src/msg_transfer/logic/history_msg_handler.go index 9a585eca2..eedf274ac 100644 --- a/src/msg_transfer/logic/history_msg_handler.go +++ b/src/msg_transfer/logic/history_msg_handler.go @@ -66,9 +66,6 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) if err != nil { log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error()) } - pbSaveData.Options = pbData.Options - pbSaveData.OfflineInfo = pbData.OfflineInfo - sendMessageToPush(&pbSaveData) } else if msgKey == pbSaveData.SendID { err := saveUserChat(pbData.SendID, &pbSaveData) if err != nil { @@ -82,11 +79,19 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) } } + if msgKey == pbSaveData.RecvID { + pbSaveData.Options = pbData.Options + pbSaveData.OfflineInfo = pbData.OfflineInfo + sendMessageToPush(&pbSaveData) + } + log.InfoByKv("msg_transfer handle topic success...", "", "") } else if pbData.SessionType == constant.GroupChatType { log.Info("", "", "msg_transfer chat type = GroupChatType") - uidAndGroupID := strings.Split(pbData.RecvID, " ") - saveUserChat(uidAndGroupID[0], &pbSaveData) + if isHistory { + uidAndGroupID := strings.Split(pbData.RecvID, " ") + saveUserChat(uidAndGroupID[0], &pbSaveData) + } pbSaveData.Options = pbData.Options pbSaveData.OfflineInfo = pbData.OfflineInfo sendMessageToPush(&pbSaveData) @@ -108,6 +113,7 @@ func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, return nil } func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) { + log.InfoByKv("msg_transfer send message to push", message.OperationID, "message", message.String()) msg := pbPush.PushMsgReq{} msg.OperationID = message.OperationID msg.PlatformID = message.PlatformID diff --git a/src/proto/chat/chat.proto b/src/proto/chat/chat.proto index 3aedc42d1..921f14cb1 100644 --- a/src/proto/chat/chat.proto +++ b/src/proto/chat/chat.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package pbChat; -option go_package = "chat;pbChat"; +package pbChat;//The package name to which the proto file belongs +option go_package = "chat;pbChat";//The generated go pb file is in the current directory, and the package name is pbChat message WSToMsgSvrChatMsg{ string SendID = 1; diff --git a/src/rpc/chat/chat/send_msg.go b/src/rpc/chat/chat/send_msg.go index c1764ab3a..5191e662c 100644 --- a/src/rpc/chat/chat/send_msg.go +++ b/src/rpc/chat/chat/send_msg.go @@ -41,6 +41,7 @@ type MsgCallBackResp struct { } func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) { + log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String()) serverMsgID := GetMsgID(pb.SendID) pbData := pbChat.WSToMsgSvrChatMsg{} pbData.MsgFrom = pb.MsgFrom @@ -59,7 +60,7 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* pbData.MsgID = serverMsgID pbData.OperationID = pb.OperationID pbData.Token = pb.Token - pbData.SendTime = utils.GetCurrentTimestampBySecond() + pbData.SendTime = utils.GetCurrentTimestampByNano() replay := pbChat.UserSendMsgResp{} m := MsgCallBackResp{} if config.Config.MessageCallBack.CallbackSwitch { @@ -84,16 +85,22 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* return returnMsg(&replay, pb, m.ResponseErrCode, m.ErrMsg, "", 0) } else { pbData.Content = m.ResponseResult.ModifiedMsg - rpc.sendMsgToKafka(&pbData, pbData.RecvID) - rpc.sendMsgToKafka(&pbData, pbData.SendID) + err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) + err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) + if err1 != nil || err2 != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) } } } else { switch pbData.SessionType { case constant.SingleChatType: - rpc.sendMsgToKafka(&pbData, pbData.RecvID) - rpc.sendMsgToKafka(&pbData, pbData.SendID) + err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) + err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) + if err1 != nil || err2 != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) case constant.GroupChatType: etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) @@ -138,11 +145,17 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* groupID := pbData.RecvID for i, v := range reply.MemberList { pbData.RecvID = v.UserId + " " + groupID - rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) + err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) + if err != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } } for i, v := range addUidList { pbData.RecvID = v + " " + groupID - rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) + err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) + if err != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) default: @@ -153,12 +166,12 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) } -func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) { +func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error { pid, offset, err := rpc.producer.SendMessage(m, key) if err != nil { log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key) } - + return err } func GetMsgID(sendID string) string { t := time.Now().Format("2006-01-02 15:04:05") diff --git a/src/utils/time_format.go b/src/utils/time_format.go index 91e6ddd3b..3accd0567 100644 --- a/src/utils/time_format.go +++ b/src/utils/time_format.go @@ -26,6 +26,11 @@ func UnixSecondToTime(second int64) time.Time { return time.Unix(second, 0) } +//Convert nano timestamp to time.Time type +func UnixNanoSecondToTime(nanoSecond int64) time.Time { + return time.Unix(0, nanoSecond) +} + //Get the current timestamp by Nano func GetCurrentTimestampByNano() int64 { return time.Now().UnixNano()