add super group

This commit is contained in:
Gordon 2022-05-28 18:10:08 +08:00
parent e9bf26e0ca
commit 7285e2b2c9
9 changed files with 179 additions and 398 deletions

View File

@ -64,15 +64,19 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
} }
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data) log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data)
rpcReq := pbChat.GetMaxAndMinSeqReq{} nReply := new(sdk_ws.GetMaxAndMinSeqResp)
nReply := new(pbChat.GetMaxAndMinSeqResp) isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq)
if isPass {
rpcReq := sdk_ws.GetMaxAndMinSeqReq{}
rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList
rpcReq.UserID = m.SendID rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID rpcReq.OperationID = m.OperationID
log.Debug(m.OperationID, "Ws call success to getMaxAndMinSeq", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList)
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn) msgClient := pbChat.NewChatClient(grpcConn)
rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq)
if err != nil { if err != nil {
log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err, rpcReq.String()) log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err.Error(), rpcReq.String())
nReply.ErrCode = 500 nReply.ErrCode = 500
nReply.ErrMsg = err.Error() nReply.ErrMsg = err.Error()
ws.getSeqResp(conn, m, nReply) ws.getSeqResp(conn, m, nReply)
@ -80,12 +84,17 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String()) log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String())
ws.getSeqResp(conn, m, rpcReply) ws.getSeqResp(conn, m, rpcReply)
} }
} else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.getSeqResp(conn, m, nReply)
}
} }
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) { func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) {
var mReplyData sdk_ws.GetMaxAndMinSeqResp log.Debug(m.OperationID, "getSeqResp come here ", pb.String())
mReplyData.MaxSeq = pb.GetMaxSeq() b, _ := proto.Marshal(pb)
mReplyData.MinSeq = pb.GetMinSeq()
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr, MsgIncr: m.MsgIncr,
@ -146,6 +155,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgAllCountLock.Lock() sendMsgAllCountLock.Lock()
sendMsgAllCount++ sendMsgAllCount++
sendMsgAllCountLock.Unlock() sendMsgAllCountLock.Unlock()
//stat.GaugeVecApiMethod.WithLabelValues("ws_send_message_count").Inc()
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
nReply := new(pbChat.SendMsgResp) nReply := new(pbChat.SendMsgResp)

View File

@ -59,6 +59,18 @@ type SeqListData struct {
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) { func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
switch r { switch r {
case constant.WSGetNewestSeq:
data := open_im_sdk.GetMaxAndMinSeqReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error("", "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error("", "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WSSendMsg: case constant.WSSendMsg:
data := open_im_sdk.MsgData{} data := open_im_sdk.MsgData{}
if err := proto.Unmarshal(m.Data, &data); err != nil { if err := proto.Unmarshal(m.Data, &data); err != nil {

View File

@ -14,14 +14,13 @@ const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0 const OnlineTopicVacancy = 0
const Msg = 2 const Msg = 2
const ConsumerMsgs = 3 const ConsumerMsgs = 3
const UserMessages = 4 const AggregationMessages = 4
const MongoMessages = 5 const MongoMessages = 5
const ChannelNum = 100 const ChannelNum = 100
var ( var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler historyCH OnlineHistoryConsumerHandler
offlineHistoryCH OfflineHistoryConsumerHandler
producer *kafka.Producer producer *kafka.Producer
cmdCh chan Cmd2Value cmdCh chan Cmd2Value
onlineTopicStatus int onlineTopicStatus int

View File

@ -1,313 +0,0 @@
package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"time"
)
type OfflineHistoryConsumerHandler struct {
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
cmdCh chan Cmd2Value
msgCh chan Cmd2Value
chArrays [ChannelNum]chan Cmd2Value
msgDistributionCh chan Cmd2Value
}
func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
mc.msgHandle = make(map[string]fcb)
mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
go mc.MessagesDistributionHandle()
mc.cmdCh = cmdCh
mc.msgCh = make(chan Cmd2Value, 1000)
for i := 0; i < ChannelNum; i++ {
mc.chArrays[i] = make(chan Cmd2Value, 1000)
go mc.Run(i)
}
if config.Config.ReliableStorage {
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
} else {
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability
}
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic},
config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline)
}
func (och *OfflineHistoryConsumerHandler) Run(channelID int) {
for {
select {
case cmd := <-och.chArrays[channelID]:
switch cmd.Cmd {
case UserMessages:
msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList))
for _, v := range msgList {
log.Debug(triggerID, "msg come to storage center", v.String())
isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory)
isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync)
if isHistory {
storageMsgList = append(storageMsgList, v)
}
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
pushMsgList = append(pushMsgList, v)
}
}
//switch msgChannelValue.msg.MsgData.SessionType {
//case constant.SingleChatType:
//case constant.GroupChatType:
//case constant.NotificationChatType:
//default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
//}
err, _ := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
if err != nil {
singleMsgFailedCount += uint64(len(storageMsgList))
log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList)
} else {
singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount += uint64(len(storageMsgList))
singleMsgSuccessCountMutex.Unlock()
for _, v := range pushMsgList {
sendMessageToPush(v, msgChannelValue.userID)
}
}
}
}
}
}
func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() {
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
for {
select {
case cmd := <-och.msgDistributionCh:
switch cmd.Cmd {
case ConsumerMsgs:
triggerChannelValue := cmd.Value.(TriggerChannelValue)
triggerID := triggerChannelValue.triggerID
consumerMessages := triggerChannelValue.cmsgList
//Aggregation map[userid]message list
log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ {
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil {
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
return
}
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String())
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
} else {
m := make([]*pbMsg.MsgDataToMQ, 0, 100)
m = append(m, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = m
}
}
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
for userID, v := range UserAggregationMsgs {
if len(v) >= 0 {
channelID := getHashCode(userID) % ChannelNum
go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}}
}(channelID, userID, v)
}
}
}
}
}
}
func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
operationID := msgFromMQ.OperationID
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
switch msgFromMQ.MsgData.SessionType {
case constant.SingleChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
singleMsgFailedCount++
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount++
singleMsgSuccessCountMutex.Unlock()
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now))
case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
if err != nil {
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
return
}
groupMsgCount++
}
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now))
case constant.NotificationChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return
}
sess.MarkMessage(cMsg, "")
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
}
func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
operationID := msgFromMQ.OperationID
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
if isHistory {
seq, err := db.DB.IncrUserSeq(msgKey)
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
//mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} else {
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
go sendMessageToPush(&msgFromMQ, msgKey)
}
}
}
func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
//func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
// //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
// //for msg := range claim.Messages() {
// // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline")
// // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
// //}
// for msg := range claim.Messages() {
// if GetOnlineTopicStatus() == OnlineTopicVacancy {
// log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
// } else {
// select {
// case <-mc.cmdCh:
// log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// case <-time.After(time.Millisecond * time.Duration(100)):
// log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// }
// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
// }
// }
//
// return nil
//}
func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 500)
t := time.NewTicker(time.Duration(500) * time.Millisecond)
var triggerID string
for msg := range claim.Messages() {
//och.TriggerCmd(OnlineTopicBusy)
cMsg = append(cMsg, msg)
select {
case <-t.C:
if len(cMsg) >= 0 {
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: cMsg}}
sess.MarkMessage(msg, "")
cMsg = cMsg[0:0]
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
}
default:
if len(cMsg) >= 500 {
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "length trigger msg consumer start", len(cMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: cMsg}}
sess.MarkMessage(msg, "")
cMsg = cMsg[0:0]
log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
}
}
log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
}
return nil
}

View File

@ -22,7 +22,7 @@ import (
) )
type MsgChannelValue struct { type MsgChannelValue struct {
userID string aggregationID string //maybe userID or super groupID
triggerID string triggerID string
msgList []*pbMsg.MsgDataToMQ msgList []*pbMsg.MsgDataToMQ
lastSeq uint64 lastSeq uint64
@ -98,13 +98,13 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
select { select {
case cmd := <-och.chArrays[channelID]: case cmd := <-och.chArrays[channelID]:
switch cmd.Cmd { switch cmd.Cmd {
case UserMessages: case AggregationMessages:
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID triggerID := msgChannelValue.triggerID
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
notStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList))
for _, v := range msgList { for _, v := range msgList {
log.Debug(triggerID, "msg come to storage center", v.String()) log.Debug(triggerID, "msg come to storage center", v.String())
isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory)
@ -113,9 +113,10 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
storageMsgList = append(storageMsgList, v) storageMsgList = append(storageMsgList, v)
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
} else { } else {
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) {
notStoragepushMsgList = append(notStoragepushMsgList, v) notStoragePushMsgList = append(notStoragePushMsgList, v)
} }
} }
} }
@ -128,8 +129,8 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return // return
//} //}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList)) log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList))
err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) err, lastSeq := saveUserChatList(msgChannelValue.aggregationID, storageMsgList, triggerID)
if err != nil { if err != nil {
singleMsgFailedCount += uint64(len(storageMsgList)) singleMsgFailedCount += uint64(len(storageMsgList))
log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList)
@ -137,28 +138,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
singleMsgSuccessCountMutex.Lock() singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCount += uint64(len(storageMsgList))
singleMsgSuccessCountMutex.Unlock() singleMsgSuccessCountMutex.Unlock()
och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
go func(push, storage []*pbMsg.MsgDataToMQ) { go func(push, storage []*pbMsg.MsgDataToMQ) {
for _, v := range storage { for _, v := range storage {
sendMessageToPush(v, msgChannelValue.userID) sendMessageToPush(v, msgChannelValue.aggregationID)
} }
for _, x := range push { for _, x := range push {
sendMessageToPush(x, msgChannelValue.userID) sendMessageToPush(x, msgChannelValue.aggregationID)
} }
}(notStoragepushMsgList, storageMsgList) }(notStoragePushMsgList, storageMsgList)
} }
} }
} }
} }
} }
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
hashCode := getHashCode(userID) hashCode := getHashCode(aggregationID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
} }
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
for { for {
@ -169,9 +170,9 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID triggerID := msgChannelValue.triggerID
userID := msgChannelValue.userID aggregationID := msgChannelValue.aggregationID
lastSeq := msgChannelValue.lastSeq lastSeq := msgChannelValue.lastSeq
err := db.DB.BatchInsertChat2DB(userID, msgList, triggerID, lastSeq) err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq)
if err != nil { if err != nil {
log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
} }
@ -202,7 +203,7 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
for { for {
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
select { select {
case cmd := <-och.msgDistributionCh: case cmd := <-och.msgDistributionCh:
switch cmd.Cmd { switch cmd.Cmd {
@ -220,23 +221,23 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
return return
} }
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, &msgFromMQ) oldM = append(oldM, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM aggregationMsgs[string(consumerMessages[i].Key)] = oldM
} else { } else {
m := make([]*pbMsg.MsgDataToMQ, 0, 100) m := make([]*pbMsg.MsgDataToMQ, 0, 100)
m = append(m, &msgFromMQ) m = append(m, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = m aggregationMsgs[string(consumerMessages[i].Key)] = m
} }
} }
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) log.Debug(triggerID, "generate map list users len", len(aggregationMsgs))
for userID, v := range UserAggregationMsgs { for aggregationID, v := range aggregationMsgs {
if len(v) >= 0 { if len(v) >= 0 {
hashCode := getHashCode(userID) hashCode := getHashCode(aggregationID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: v, triggerID: triggerID}}
//}(channelID, userID, v) //}(channelID, userID, v)
} }
} }

View File

@ -6,18 +6,25 @@ import (
commonDB "Open_IM/pkg/common/db" commonDB "Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
open_im_sdk "Open_IM/pkg/proto/sdk_ws" open_im_sdk "Open_IM/pkg/proto/sdk_ws"
) )
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) { func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *open_im_sdk.GetMaxAndMinSeqReq) (*open_im_sdk.GetMaxAndMinSeqResp, error) {
log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String()) log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String())
resp := new(open_im_sdk.GetMaxAndMinSeqResp)
m := make(map[string]*open_im_sdk.MaxAndMinSeq)
//seq, err := model.GetBiggestSeqFromReceive(in.UserID) //seq, err := model.GetBiggestSeqFromReceive(in.UserID)
maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID) maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID)
minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID) minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID)
resp := new(pbMsg.GetMaxAndMinSeqResp)
if err1 == nil { if err1 == nil {
resp.MaxSeq = uint32(maxSeq) resp.MaxSeq = uint32(maxSeq)
for _, v := range in.GroupIDList {
x := new(open_im_sdk.MaxAndMinSeq)
maxSeq, _ := commonDB.DB.GetUserMaxSeq(v)
x.MaxSeq = uint32(maxSeq)
m[v] = x
}
resp.GroupMaxAndMinSeq = m
} else if err1 == redis.ErrNil { } else if err1 == redis.ErrNil {
resp.MaxSeq = 0 resp.MaxSeq = 0
} else { } else {
@ -39,6 +46,7 @@ func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeq
func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.PullMessageBySeqListReq) (*open_im_sdk.PullMessageBySeqListResp, error) { func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.PullMessageBySeqListReq) (*open_im_sdk.PullMessageBySeqListResp, error) {
log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String())
resp := new(open_im_sdk.PullMessageBySeqListResp) resp := new(open_im_sdk.PullMessageBySeqListResp)
m := make(map[string]*open_im_sdk.MsgDataList)
//msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) //msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID)
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID)
if err != nil { if err != nil {
@ -60,6 +68,32 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull
} else { } else {
resp.List = redisMsgList resp.List = redisMsgList
} }
for k, v := range in.GroupSeqList {
x := new(open_im_sdk.MsgDataList)
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID)
if err != nil {
if err != redis.ErrNil {
log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList)
} else {
log.Debug(in.OperationID, "get message from redis is nil", failedSeqList)
}
msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(k, failedSeqList, in.OperationID)
if err1 != nil {
log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error())
resp.ErrCode = 201
resp.ErrMsg = err.Error()
return resp, nil
} else {
redisMsgList = append(redisMsgList, msgList...)
x.MsgDataList = redisMsgList
m[k] = x
}
} else {
x.MsgDataList = redisMsgList
m[k] = x
}
}
resp.GroupMsgDataList = m
//respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) //respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID)
//respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) //respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat)
return resp, nil return resp, nil

View File

@ -50,9 +50,7 @@ type MsgCallBackResp struct {
} }
func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) { func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) {
if data.MsgData.SessionType == constant.GroupChatType { if data.MsgData.SessionType == constant.SingleChatType {
return true, 0, ""
}
log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify) log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify)
reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID} reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
@ -90,6 +88,11 @@ func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string)
} else { } else {
return true, 0, "" return true, 0, ""
} }
} else {
return true, 0, ""
}
} }
func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
msg.ServerMsgID = GetMsgID(msg.SendID) msg.ServerMsgID = GetMsgID(msg.SendID)
@ -368,6 +371,34 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.SuperGroupChatType:
// callback
callbackResp := callbackBeforeSendSingleMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
// callback
callbackResp = callbackAfterSendSingleMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp)
}
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
default: default:
return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0) return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0)
} }

View File

@ -107,7 +107,7 @@ const (
//SessionType //SessionType
SingleChatType = 1 SingleChatType = 1
GroupChatType = 2 GroupChatType = 2
SuperGroupChatType = 3
NotificationChatType = 4 NotificationChatType = 4
//token //token
NormalToken = 0 NormalToken = 0

View File

@ -949,6 +949,10 @@ func getSeqUid(uid string, seq uint32) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocMsgNum
return indexGen(uid, seqSuffix) return indexGen(uid, seqSuffix)
} }
func getSeqSuperGroupID(groupID string, seq uint32) string {
seqSuffix := seq / singleGocMsgNum
return superGroupIndexGen(groupID, seqSuffix)
}
func GetSeqUid(uid string, seq uint32) string { func GetSeqUid(uid string, seq uint32) string {
return getSeqUid(uid, seq) return getSeqUid(uid, seq)
@ -986,3 +990,6 @@ func isNotContainInt32(target uint32, List []uint32) bool {
func indexGen(uid string, seqSuffix uint32) string { func indexGen(uid string, seqSuffix uint32) string {
return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10)
} }
func superGroupIndexGen(groupID string, seqSuffix uint32) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}