diff --git a/src/api/open_im_api.go b/src/api/open_im_api.go index 8a8d7ec16..3a13d1c4a 100644 --- a/src/api/open_im_api.go +++ b/src/api/open_im_api.go @@ -24,7 +24,8 @@ func main() { return } syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd())) - + //gin.SetMode(gin.ReleaseMode) + //gin.DefaultWriter = ioutil.Discard log.Info("", "", "api server running...") r := gin.Default() r.Use(utils.CorsHandler()) diff --git a/src/common/log/logrus.go b/src/common/log/logrus.go index aa5bbcd05..09301ab24 100644 --- a/src/common/log/logrus.go +++ b/src/common/log/logrus.go @@ -62,7 +62,6 @@ func NewLfsHook(rotationTime time.Duration, maxRemainNum uint, moduleName string HideKeys: false, FieldsOrder: []string{"PID"}, }) - return lfsHook } func initRotateLogs(rotationTime time.Duration, maxRemainNum uint, level string, moduleName string) *rotatelogs.RotateLogs { diff --git a/src/msg_transfer/logic/persistent_msg_handler.go b/src/msg_transfer/logic/persistent_msg_handler.go index 9a54571f4..fbbe8b1e3 100644 --- a/src/msg_transfer/logic/persistent_msg_handler.go +++ b/src/msg_transfer/logic/persistent_msg_handler.go @@ -8,6 +8,7 @@ package logic import ( "Open_IM/src/common/config" + "Open_IM/src/common/constant" "Open_IM/src/common/db/mysql_model/im_mysql_msg_model" kfk "Open_IM/src/common/kafka" "Open_IM/src/common/log" @@ -15,6 +16,7 @@ import ( "Open_IM/src/utils" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" + "strings" ) type PersistentConsumerHandler struct { @@ -42,14 +44,23 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey strin //Control whether to store history messages (mysql) isPersist := utils.GetSwitchFromOptions(Options, "persistent") //Only process receiver data - if isPersist && msgKey == pbData.RecvID { - log.InfoByKv("msg_transfer chat persisting", pbData.OperationID) - if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil { - log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String()) - return + if isPersist { + if msgKey == pbData.RecvID && pbData.SessionType == constant.SingleChatType { + log.InfoByKv("msg_transfer chat persisting", pbData.OperationID) + if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil { + log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String()) + return + } + } else if pbData.SessionType == constant.GroupChatType && msgKey == "0" { + pbData.RecvID = strings.Split(pbData.RecvID, " ")[1] + log.InfoByKv("msg_transfer chat persisting", pbData.OperationID) + if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil { + log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String()) + return + } } - } + } } func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/src/push/logic/push_to_client.go b/src/push/logic/push_to_client.go index 564b6741d..11072f184 100644 --- a/src/push/logic/push_to_client.go +++ b/src/push/logic/push_to_client.go @@ -131,9 +131,9 @@ func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { return } groupID := m.RecvID - for _, v := range reply.MemberList { + for i, v := range reply.MemberList { m.RecvID = v.UserId + " " + groupID - sendMsgToKafka(m, m.RecvID, "msgKey--recvID+\" \"+groupID") + sendMsgToKafka(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID") } default: diff --git a/src/rpc/chat/chat/send_msg.go b/src/rpc/chat/chat/send_msg.go index 7b076bade..27d0a7a8f 100644 --- a/src/rpc/chat/chat/send_msg.go +++ b/src/rpc/chat/chat/send_msg.go @@ -111,9 +111,9 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0) } groupID := pbData.RecvID - for _, v := range reply.MemberList { + for i, v := range reply.MemberList { pbData.RecvID = v.UserId + " " + groupID - rpc.sendMsgToKafka(&pbData, pbData.RecvID) + rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) default: