From 205229ee604fd856c670fa0651e4855487179e15 Mon Sep 17 00:00:00 2001
From: Gordon <1432970085@qq.com>
Date: Tue, 6 Jul 2021 20:04:57 +0800
Subject: [PATCH] persistent message modify

---
 src/api/open_im_api.go                        |  3 ++-
 src/common/log/logrus.go                      |  1 -
 .../logic/persistent_msg_handler.go           | 23 ++++++++++++++-----
 src/push/logic/push_to_client.go              |  4 ++--
 src/rpc/chat/chat/send_msg.go                 |  4 ++--
 5 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/src/api/open_im_api.go b/src/api/open_im_api.go
index 8a8d7ec16..3a13d1c4a 100644
--- a/src/api/open_im_api.go
+++ b/src/api/open_im_api.go
@@ -24,7 +24,8 @@ func main() {
 		return
 	}
 	syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd()))
-
+	//gin.SetMode(gin.ReleaseMode)
+	//gin.DefaultWriter = ioutil.Discard
 	log.Info("", "", "api server running...")
 	r := gin.Default()
 	r.Use(utils.CorsHandler())
diff --git a/src/common/log/logrus.go b/src/common/log/logrus.go
index aa5bbcd05..09301ab24 100644
--- a/src/common/log/logrus.go
+++ b/src/common/log/logrus.go
@@ -62,7 +62,6 @@ func NewLfsHook(rotationTime time.Duration, maxRemainNum uint, moduleName string
 		HideKeys:        false,
 		FieldsOrder:     []string{"PID"},
 	})
-
 	return lfsHook
 }
 func initRotateLogs(rotationTime time.Duration, maxRemainNum uint, level string, moduleName string) *rotatelogs.RotateLogs {
diff --git a/src/msg_transfer/logic/persistent_msg_handler.go b/src/msg_transfer/logic/persistent_msg_handler.go
index 9a54571f4..fbbe8b1e3 100644
--- a/src/msg_transfer/logic/persistent_msg_handler.go
+++ b/src/msg_transfer/logic/persistent_msg_handler.go
@@ -8,6 +8,7 @@ package logic
 
 import (
 	"Open_IM/src/common/config"
+	"Open_IM/src/common/constant"
 	"Open_IM/src/common/db/mysql_model/im_mysql_msg_model"
 	kfk "Open_IM/src/common/kafka"
 	"Open_IM/src/common/log"
@@ -15,6 +16,7 @@ import (
 	"Open_IM/src/utils"
 	"github.com/Shopify/sarama"
 	"github.com/golang/protobuf/proto"
+	"strings"
 )
 
 type PersistentConsumerHandler struct {
@@ -42,14 +44,23 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey strin
 	//Control whether to store history messages (mysql)
 	isPersist := utils.GetSwitchFromOptions(Options, "persistent")
 	//Only process receiver data
-	if isPersist && msgKey == pbData.RecvID {
-		log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
-		if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
-			log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
-			return
+	if isPersist {
+		if msgKey == pbData.RecvID && pbData.SessionType == constant.SingleChatType {
+			log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
+			if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
+				log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
+				return
+			}
+		} else if pbData.SessionType == constant.GroupChatType && msgKey == "0" {
+			pbData.RecvID = strings.Split(pbData.RecvID, " ")[1]
+			log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
+			if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
+				log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
+				return
+			}
 		}
-	}
 
+	}
 }
 func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
 func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
diff --git a/src/push/logic/push_to_client.go b/src/push/logic/push_to_client.go
index 564b6741d..11072f184 100644
--- a/src/push/logic/push_to_client.go
+++ b/src/push/logic/push_to_client.go
@@ -131,9 +131,9 @@ func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {
 			return
 		}
 		groupID := m.RecvID
-		for _, v := range reply.MemberList {
+		for i, v := range reply.MemberList {
 			m.RecvID = v.UserId + " " + groupID
-			sendMsgToKafka(m, m.RecvID, "msgKey--recvID+\" \"+groupID")
+			sendMsgToKafka(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID")
 		}
 	default:
 
diff --git a/src/rpc/chat/chat/send_msg.go b/src/rpc/chat/chat/send_msg.go
index 7b076bade..27d0a7a8f 100644
--- a/src/rpc/chat/chat/send_msg.go
+++ b/src/rpc/chat/chat/send_msg.go
@@ -111,9 +111,9 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
 				return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0)
 			}
 			groupID := pbData.RecvID
-			for _, v := range reply.MemberList {
+			for i, v := range reply.MemberList {
 				pbData.RecvID = v.UserId + " " + groupID
-				rpc.sendMsgToKafka(&pbData, pbData.RecvID)
+				rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
 			}
 			return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
 		default: