open-im-server/internal/msgtransfer/persistent_msg_handler.go
wangchuxiao 759432794f errcode
2023-02-09 20:36:34 +08:00

113 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
** description("").
** copyright('tuoyun,www.tuoyun.net').
** author("fg,Gordon@tuoyun.net").
** time(2021/5/11 15:37).
*/
package msgtransfer
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mysql_model/im_mysql_msg_model"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
promePkg "Open_IM/pkg/common/prometheus"
)
type PersistentConsumerHandler struct {
msgHandle map[string]fcb
persistentConsumerGroup *kfk.MConsumerGroup
}
func (pc *PersistentConsumerHandler) Init() {
pc.msgHandle = make(map[string]fcb)
pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
}
func initPrometheus() {
promePkg.NewSeqGetSuccessCounter()
promePkg.NewSeqGetFailedCounter()
promePkg.NewSeqSetSuccessCounter()
promePkg.NewSeqSetFailedCounter()
promePkg.NewMsgInsertRedisSuccessCounter()
promePkg.NewMsgInsertRedisFailedCounter()
promePkg.NewMsgInsertMongoSuccessCounter()
promePkg.NewMsgInsertMongoFailedCounter()
}
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
var tag bool
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
return
}
log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
//Only process receiver data
if isPersist {
switch msgFromMQ.MsgData.SessionType {
case constant.SingleChatType, constant.NotificationChatType:
if msgKey == msgFromMQ.MsgData.RecvID {
tag = true
}
case constant.GroupChatType:
if msgKey == msgFromMQ.MsgData.SendID {
tag = true
}
case constant.SuperGroupChatType:
tag = true
}
if tag {
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil {
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
return
}
}
}
}
func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
} else {
log.Error("", "msg get from kafka but is nil", msg.Key)
}
sess.MarkMessage(msg, "")
}
return nil
}
1. 请求1 group Rpc 2. 请求2 发消息 sendMsg rpc
1 更改数据库
2. 删除哈希缓存
检测到哈希变了 群成员还没来得及删除有问题
3. 删除群成员缓存
4. 删除对应群成员加群缓存
5. 删除数量缓存