mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Reduce memory copies
This commit is contained in:
parent
2fc6b0a8cb
commit
d276be9594
@ -277,11 +277,15 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
||||
remain := len(v) % split
|
||||
for i := 0; i < len(v)/split; i++ {
|
||||
wg.Add(1)
|
||||
go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg)
|
||||
tmp := valueCopy(pb)
|
||||
// go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg)
|
||||
go rpc.sendMsgToGroupOptimization(v[i*split:(i+1)*split], tmp, k, &sendTag, &wg)
|
||||
}
|
||||
if remain > 0 {
|
||||
wg.Add(1)
|
||||
go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg)
|
||||
tmp := valueCopy(pb)
|
||||
// go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg)
|
||||
go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg)
|
||||
}
|
||||
}
|
||||
wg.Add(1)
|
||||
@ -431,6 +435,29 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i
|
||||
return true
|
||||
}
|
||||
|
||||
func modifyMessageByUserMessageReceiveOptoptimization(userID, sourceID string, sessionType int, operationID string, options *map[string]bool) bool {
|
||||
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
|
||||
opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID)
|
||||
if err != nil && err != redis.ErrNil {
|
||||
log.NewError(operationID, "GetSingleConversationMsgOpt from redis err", userID, conversationID, err.Error())
|
||||
return true
|
||||
}
|
||||
|
||||
switch opt {
|
||||
case constant.ReceiveMessage:
|
||||
return true
|
||||
case constant.NotReceiveMessage:
|
||||
return false
|
||||
case constant.ReceiveNotNotifyMessage:
|
||||
if *options == nil {
|
||||
*options = make(map[string]bool, 10)
|
||||
}
|
||||
utils.SetSwitchFromOptions(*options, constant.IsOfflinePush, false)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type NotificationMsg struct {
|
||||
SendID string
|
||||
RecvID string
|
||||
@ -748,6 +775,23 @@ func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, o
|
||||
m[constant.OfflineStatus] = offlUserIDList
|
||||
}
|
||||
|
||||
func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq {
|
||||
offlinePushInfo := sdk_ws.OfflinePushInfo{}
|
||||
if pb.MsgData.OfflinePushInfo != nil {
|
||||
offlinePushInfo = *pb.MsgData.OfflinePushInfo
|
||||
}
|
||||
msgData := sdk_ws.MsgData{}
|
||||
msgData = *pb.MsgData
|
||||
msgData.OfflinePushInfo = &offlinePushInfo
|
||||
|
||||
options := make(map[string]bool, 10)
|
||||
for key, value := range pb.MsgData.Options {
|
||||
options[key] = value
|
||||
}
|
||||
msgData.Options = options
|
||||
return &pbChat.SendMsgReq{Token: "", OperationID: pb.OperationID, MsgData: &msgData}
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
|
||||
// log.Debug(pb.OperationID, "split userID ", list)
|
||||
offlinePushInfo := sdk_ws.OfflinePushInfo{}
|
||||
@ -783,3 +827,22 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
|
||||
msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData}
|
||||
for _, v := range list {
|
||||
groupPB.MsgData.RecvID = v
|
||||
isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB)
|
||||
if isSend {
|
||||
err := rpc.sendMsgToKafka(&msgToMQGroup, v, status)
|
||||
if err != nil {
|
||||
log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String())
|
||||
} else {
|
||||
*sendTag = true
|
||||
}
|
||||
} else {
|
||||
log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
@ -124,6 +124,7 @@ func GetSwitchFromOptions(Options map[string]bool, key string) (result bool) {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func SetSwitchFromOptions(options map[string]bool, key string, value bool) {
|
||||
if options == nil {
|
||||
options = make(map[string]bool, 5)
|
||||
|
Loading…
x
Reference in New Issue
Block a user