Merge remote-tracking branch 'origin/errcode' into errcode

# Conflicts:
#	internal/msgtransfer/online_history_msg_handler.go
#	internal/msgtransfer/online_msg_to_mongo_handler.go
#	internal/push/init.go
#	pkg/common/db/cache/redis.go
#	pkg/common/db/controller/msg.go
This commit is contained in:
withchao 2023-02-23 18:42:56 +08:00
commit 2f1c064413
25 changed files with 328 additions and 203 deletions

17
cmd/cmdutils/main.go Normal file
View File

@ -0,0 +1,17 @@
package main
import (
"Open_IM/internal/task"
"flag"
"fmt"
"time"
)
func main() {
var userID = flag.String("userID", "", "userID to clear msg and reset seq")
var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq")
var fixAllSeq = flag.Bool("fixAllSeq", false, "fix seq")
flag.Parse()
fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID)
task.FixSeq(*userID, *workingGroupID, *fixAllSeq)
}

View File

@ -1,16 +1,14 @@
package main
import (
"Open_IM/internal/crontask"
"flag"
"Open_IM/internal/task"
"fmt"
"time"
)
func main() {
var userID = flag.String("userID", "", "userID to clear msg and reset seq")
var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq")
flag.Parse()
fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID)
cronTask.StartCronTask(*userID, *workingGroupID)
fmt.Println(time.Now(), "start cronTask")
if err := task.StartCronTask(); err != nil {
panic(err.Error())
}
}

View File

@ -20,7 +20,9 @@ func main() {
log.NewPrivateLog(constant.LogFileName)
fmt.Println("start push rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
pusher := push.Push{}
pusher.Init(*rpcPort)
if err := pusher.Init(*rpcPort); err != nil {
panic(err.Error())
}
pusher.Run(*prometheusPort)
wg.Wait()
}

View File

@ -1,43 +0,0 @@
package cronTask
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"fmt"
"time"
"github.com/robfig/cron/v3"
)
const cronTaskOperationID = "cronTaskOperationID-"
const moduleName = "cron"
func StartCronTask(userID, workingGroupID string) {
log.NewPrivateLog(moduleName)
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
if userID != "" {
operationID := getCronTaskOperationID()
ClearUsersMsg(operationID, []string{userID})
}
if workingGroupID != "" {
operationID := getCronTaskOperationID()
ClearSuperGroupMsg(operationID, []string{workingGroupID})
}
if userID != "" || workingGroupID != "" {
fmt.Println("clear msg finished")
return
}
c := cron.New()
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll)
if err != nil {
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
panic(err)
}
c.Start()
fmt.Println("start cron task success")
for {
time.Sleep(10 * time.Second)
}
}

View File

@ -56,8 +56,8 @@ type OnlineHistoryRedisConsumerHandler struct {
producerToModify *kafka.Producer
producerToMongo *kafka.Producer
msgInterface controller.MsgInterface
cache cache.MsgCache
msgDatabase controller.MsgDatabase
cache cache.Cache
}
func (och *OnlineHistoryRedisConsumerHandler) Init() {
@ -113,7 +113,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList))
if len(storageMsgList) > 0 {
lastSeq, err := och.msgInterface.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
if err != nil {
och.singleMsgFailedCountMutex.Lock()
och.singleMsgFailedCount += uint64(len(storageMsgList))

View File

@ -18,8 +18,8 @@ import (
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
msgInterface controller.MsgInterface
cache cache.MsgCache
msgDatabase controller.MsgDatabase
cache cache.Cache
}
func (mc *OnlineHistoryMongoConsumerHandler) Init() {
@ -39,12 +39,12 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
ctx := context.Background()
tracelog.SetOperationID(ctx, msgFromMQ.TriggerID)
//err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
err = mc.msgInterface.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq)
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
//err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
err = mc.msgInterface.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList)
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
@ -62,8 +62,8 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if totalUnExistSeqs, err := mc.msgInterface.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.SeqList); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
}
}

View File

@ -9,6 +9,7 @@ import (
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils/splitter"
"github.com/go-redis/redis/v8"
"sync"
"Open_IM/pkg/utils"
"context"
@ -64,13 +65,17 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
maxNum := 999
if len(userIDs) > maxNum {
s := splitter.NewSplitter(maxNum, userIDs)
wg := sync.WaitGroup{}
wg.Add(len(s.GetSplitResult()))
for i, v := range s.GetSplitResult() {
go func(index int, userIDs []string) {
defer wg.Done()
if err = g.batchPush(ctx, token, userIDs, pushReq); err != nil {
log.NewError(tracelog.GetOperationID(ctx), "batchPush failed", i, token, pushReq)
}
}(i, v.Item)
}
wg.Wait()
} else {
err = g.batchPush(ctx, token, userIDs, pushReq)
}

View File

@ -25,9 +25,12 @@ type Push struct {
successCount uint64
}
func (p *Push) Init(rpcPort int) {
var cacheInterface cache.MsgCache
func (p *Push) Init(rpcPort int) error {
redisClient, err := cache.NewRedis()
if err != nil {
return err
}
var cacheInterface cache.Cache = redisClient
p.rpcServer.Init(rpcPort, cacheInterface)
p.pushCh.Init()
statistics.NewStatistics(&p.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
@ -40,6 +43,7 @@ func (p *Push) Init(rpcPort int) {
if config.Config.Push.Fcm.Enable {
p.offlinePusher = fcm.NewClient(cacheInterface)
}
return nil
}
func (p *Push) initPrometheus() {

View File

@ -11,15 +11,18 @@ import (
"Open_IM/pkg/common/constant"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
pbChat "Open_IM/pkg/proto/msg"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
"context"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
type ConsumerHandler struct {
pushConsumerGroup *kfk.MConsumerGroup
pusher Pusher
}
func (c *ConsumerHandler) Init() {
@ -27,6 +30,7 @@ func (c *ConsumerHandler) Init() {
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
}
func (c *ConsumerHandler) handleMs2PsChat(msg []byte) {
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
msgFromMQ := pbChat.PushMsgDataToMQ{}
@ -43,11 +47,17 @@ func (c *ConsumerHandler) handleMs2PsChat(msg []byte) {
if nowSec-sec > 10 {
return
}
ctx := context.Background()
tracelog.SetOperationID(ctx, "")
var err error
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
MsgToSuperGroupUser(pbData)
err = c.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData)
default:
MsgToUser(pbData)
err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
if err != nil {
log.NewError("", "push failed", *pbData)
}
}
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

View File

@ -21,16 +21,13 @@ import (
type RPCServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
push controller.PushInterface
pushInterface controller.PushInterface
pusher Pusher
}
func (r *RPCServer) Init(rpcPort int, cache cache.MsgCache) {
r.rpcPort = rpcPort
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
}
func (r *RPCServer) run() {
@ -84,13 +81,13 @@ func (r *RPCServer) run() {
func (r *RPCServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) {
switch pbData.MsgData.SessionType {
case constant.SuperGroupChatType:
MsgToSuperGroupUser(pbData)
err = r.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData)
default:
MsgToUser(pbData)
err = r.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
return &pbPush.PushMsgResp{}, nil
return &pbPush.PushMsgResp{}, err
}
func (r *RPCServer) DelUserPushToken(ctx context.Context, req *pbPush.DelUserPushTokenReq) (resp *pbPush.DelUserPushTokenResp, err error) {
return &pbPush.DelUserPushTokenResp{}, r.push.DelFcmToken(ctx, req.UserID, int(req.PlatformID))
return &pbPush.DelUserPushTokenResp{}, r.pushInterface.DelFcmToken(ctx, req.UserID, int(req.PlatformID))
}

View File

@ -79,7 +79,6 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat
}
err = p.OfflinePushMsg(ctx, userID, msg, userIDs)
if err != nil {
log.NewError(operationID, "OfflinePushMsg failed", userID)
return err
}
}

View File

@ -1,31 +0,0 @@
package push
import (
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/auth"
"Open_IM/pkg/common/config"
)
var badgeType = -2
var iosAcceptId = auth.Auther{AccessID: config.Config.Push.Tpns.Ios.AccessID, SecretKey: config.Config.Push.Tpns.Ios.SecretKey}
func IOSAccountListPush(accounts []string, title, content, jsonCustomContent string) {
var iosMessage = tpns.Message{
Title: title,
Content: content,
IOS: &tpns.IOSParams{
Aps: &tpns.Aps{
BadgeType: &badgeType,
Sound: "default",
Category: "INVITE_CATEGORY",
},
CustomContent: jsonCustomContent,
//CustomContent: `"{"key\":\"value\"}"`,
},
}
pushReq, reqBody, err := req.NewListAccountPush(accounts, iosMessage)
if err != nil {
return
}
iosAcceptId.Auth(pushReq, auth.UseSignAuthored, iosAcceptId, reqBody)
common.PushAndGetResult(pushReq)
}

View File

@ -51,7 +51,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply
if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil {
return nil, err
}
if err := CallbackBeforeAddFriend(ctx, req); err != nil {
if err := CallbackBeforeAddFriend(ctx, req); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
if req.ToUserID == req.FromUserID {

View File

@ -142,7 +142,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
if err != nil {
return nil, err
}
if err := CallbackBeforeCreateGroup(ctx, req); err != nil {
if err := CallbackBeforeCreateGroup(ctx, req); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
var groupMembers []*relationTb.GroupMemberModel
@ -158,7 +158,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
groupMember.OperatorUserID = tracelog.GetOpUserID(ctx)
groupMember.JoinSource = constant.JoinByInvitation
groupMember.InviterUserID = tracelog.GetOpUserID(ctx)
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil {
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != constant.ErrCallbackContinue {
return err
}
groupMembers = append(groupMembers, groupMember)
@ -318,7 +318,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
member.OperatorUserID = opUserID
member.InviterUserID = opUserID
member.JoinSource = constant.JoinByInvitation
if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil {
if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
groupMembers = append(groupMembers, member)
@ -601,7 +601,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
OperatorUserID: tracelog.GetOpUserID(ctx),
Ex: groupRequest.Ex,
}
if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil {
if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
}
@ -645,7 +645,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
groupMember.OperatorUserID = tracelog.GetOpUserID(ctx)
groupMember.JoinSource = constant.JoinByInvitation
groupMember.InviterUserID = tracelog.GetOpUserID(ctx)
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil {
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
if err := s.GroupDatabase.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil {

View File

@ -16,7 +16,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
resp = &msg.SendMsgResp{}
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
// callback
if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil {
if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
@ -61,7 +61,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter)
if err = CallbackBeforeSendSingleMsg(ctx, req); err != nil {
if err = CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
_, err = m.messageVerification(ctx, req)
@ -86,7 +86,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
}
}
err = CallbackAfterSendSingleMsg(ctx, req)
if err != nil {
if err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter)
@ -100,7 +100,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (
// callback
promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter)
err = CallbackBeforeSendGroupMsg(ctx, req)
if err != nil {
if err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
@ -235,7 +235,7 @@ func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg
return nil, constant.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
if err := CallbackMsgModify(ctx, req); err != nil {
if err := CallbackMsgModify(ctx, req); err != nil && err != constant.ErrCallbackContinue {
return nil, err
}
switch req.MsgData.SessionType {

View File

@ -1,4 +1,4 @@
package cronTask
package task
import (
"Open_IM/pkg/common/config"
@ -8,20 +8,22 @@ import (
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"fmt"
"github.com/go-redis/redis/v8"
"math"
)
type ClearMsgTool struct {
msgInterface controller.MsgInterface
type msgTool struct {
msgInterface controller.MsgDatabase
userInterface controller.UserDatabase
groupInterface controller.GroupDatabase
}
func (c *ClearMsgTool) getCronTaskOperationID() string {
func (c *msgTool) getCronTaskOperationID() string {
return cronTaskOperationID + utils.OperationIDGenerator()
}
func (c *ClearMsgTool) ClearAll() {
func (c *msgTool) ClearAll() {
operationID := c.getCronTaskOperationID()
ctx := context.Background()
tracelog.SetOperationID(ctx, operationID)
@ -43,7 +45,7 @@ func (c *ClearMsgTool) ClearAll() {
log.NewInfo(operationID, "============================ start del cron finished ============================")
}
func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) {
func (c *msgTool) ClearUsersMsg(ctx context.Context, userIDList []string) {
for _, userID := range userIDList {
if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID)
@ -58,7 +60,7 @@ func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) {
}
}
func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) {
func (c *msgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) {
for _, groupID := range workingGroupIDList {
userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID)
if err != nil {
@ -73,12 +75,20 @@ func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDLis
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID)
continue
}
c.FixGroupUserSeq(ctx, userIDs, groupID)
for _, userID := range userIDs {
minSeqCache, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID)
if err != nil {
log.NewError(tracelog.GetOperationID(ctx), "GetGroupUserMinSeq failed", groupID, userID)
continue
}
c.FixGroupUserSeq(ctx, userID, groupID, minSeqCache, maxSeqCache)
}
c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
}
}
func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) {
func (c *msgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) {
if minSeqCache > maxSeqCache {
if err := c.msgInterface.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil {
log.NewError(tracelog.GetOperationID(ctx), "SetUserMinSeq failed", userID, minSeqCache, maxSeqCache)
@ -88,7 +98,7 @@ func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCach
}
}
func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) {
func (c *msgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) {
if minSeqCache > maxSeqCache {
if err := c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil {
log.NewError(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq failed", userID, minSeqCache, maxSeqCache)
@ -98,8 +108,63 @@ func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, group
}
}
func (c *ClearMsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) {
func (c *msgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) {
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType)
}
}
func (c *msgTool) FixAllSeq(ctx context.Context) {
userIDs, err := c.userInterface.GetAllUserID(ctx)
if err != nil {
panic(err.Error())
}
for _, userID := range userIDs {
userCurrentMinSeq, err := c.msgInterface.GetUserMinSeq(ctx, userID)
if err != nil && err != redis.Nil {
continue
}
userCurrentMaxSeq, err := c.msgInterface.GetUserMaxSeq(ctx, userID)
if err != nil && err != redis.Nil {
continue
}
if userCurrentMinSeq > userCurrentMaxSeq {
if err = c.msgInterface.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil {
fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq)
}
fmt.Println("fix", userID, userCurrentMaxSeq)
}
}
fmt.Println("fix users seq success")
groupIDs, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err != nil {
panic(err.Error())
}
for _, groupID := range groupIDs {
maxSeq, err := c.msgInterface.GetGroupMaxSeq(ctx, groupID)
if err != nil {
fmt.Println("GetGroupMaxSeq failed", groupID)
continue
}
userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID)
if err != nil {
fmt.Println("get groupID", groupID, "failed, try again later")
continue
}
for _, userID := range userIDs {
userMinSeq, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID)
if err != nil && err != redis.Nil {
fmt.Println("GetGroupUserMinSeq failed", groupID, userID)
continue
}
if userMinSeq > maxSeq {
if err = c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil {
fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq)
}
fmt.Println("fix", groupID, userID, maxSeq, userMinSeq)
}
}
}
fmt.Println("fix all seq finished")
}

View File

@ -1,4 +1,4 @@
package cronTask
package task
import (
"Open_IM/pkg/common/constant"

View File

@ -0,0 +1,56 @@
package task
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"fmt"
"time"
"github.com/robfig/cron/v3"
)
const cronTaskOperationID = "cronTaskOperationID-"
const moduleName = "cron"
func StartCronTask() error {
log.NewPrivateLog(moduleName)
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
clearCronTask := msgTool{}
ctx := context.Background()
operationID := clearCronTask.getCronTaskOperationID()
tracelog.SetOperationID(ctx, operationID)
c := cron.New()
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearCronTask.ClearAll)
if err != nil {
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
return err
}
c.Start()
fmt.Println("start cron task success")
for {
time.Sleep(10 * time.Second)
}
}
func FixSeq(userID, workingGroupID string, fixAllSeq bool) {
log.NewPrivateLog(moduleName)
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
clearCronTask := msgTool{}
ctx := context.Background()
operationID := clearCronTask.getCronTaskOperationID()
tracelog.SetOperationID(ctx, operationID)
if userID != "" {
clearCronTask.ClearUsersMsg(ctx, []string{userID})
}
if workingGroupID != "" {
clearCronTask.ClearSuperGroupMsg(ctx, []string{workingGroupID})
}
if fixAllSeq {
clearCronTask.FixAllSeq(ctx)
}
fmt.Println("fix seq finished")
}

View File

@ -18,6 +18,88 @@ import (
"github.com/golang/protobuf/proto"
)
const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
)
type Cache interface {
IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, status int32) error
GetSendMsgStatus(ctx context.Context) (int, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
}
// native redis operate
//func NewRedis() *RedisClient {
// o := &RedisClient{}
// o.InitRedis()
// return o
//}
func NewRedis() (*RedisClient, error) {
var rdb redis.UniversalClient
if config.Config.Redis.EnableCluster {
@ -131,13 +213,13 @@ func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq
}
// Store userid and platform class to redis
func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platform string, token string, flag int) error {
key := uidPidToken + userID + ":" + platform
func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return r.rdb.HSet(context.Background(), key, token, flag).Err()
}
//key:userID+platform-> <token, flag>
func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error) {
func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID int) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
m, err := r.rdb.HGetAll(context.Background(), key).Result()
mm := make(map[string]int)
@ -174,7 +256,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) {
func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err2 error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
@ -183,25 +265,25 @@ func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, se
if err != redis.Nil {
err2 = err
}
failedSeqList = append(failedSeqList, v)
failedSeqs = append(failedSeqs, v)
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
err2 = err
failedSeqList = append(failedSeqList, v)
failedSeqs = append(failedSeqs, v)
} else {
seqMsg = append(seqMsg, &msg)
seqMsgs = append(seqMsgs, &msg)
}
}
}
return seqMsg, failedSeqList, err2
return seqMsgs, failedSeqs, err2
}
func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ, uid string) (int, error) {
func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ, uid string) (int, error) {
pipe := r.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
var failedMsgs []pbChat.MsgDataToMQ
for _, msg := range msgs {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
@ -210,17 +292,17 @@ func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgL
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
//err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil {
failedList = append(failedList, *msg)
failedMsgs = append(failedMsgs, *msg)
}
}
if len(failedList) != 0 {
return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList))
if len(failedMsgs) != 0 {
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList))
}
_, err := pipe.Exec(ctx)
return 0, err
}
func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error {
for _, msg := range msgList {
func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ) error {
for _, msg := range msgs {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
err := r.rdb.Del(ctx, key).Err()
if err != nil {

View File

@ -1,7 +1,9 @@
package localcache
import (
discoveryRegistry "Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/common/config"
"Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/conversation"
"context"
"sync"
)
@ -13,10 +15,10 @@ type ConversationLocalCacheInterface interface {
type ConversationLocalCache struct {
lock sync.Mutex
SuperGroupRecvMsgNotNotifyUserIDs map[string][]string
client discoveryRegistry.SvcDiscoveryRegistry
client discoveryregistry.SvcDiscoveryRegistry
}
func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) ConversationLocalCache {
func NewConversationLocalCache(client discoveryregistry.SvcDiscoveryRegistry) ConversationLocalCache {
return ConversationLocalCache{
SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0),
client: client,
@ -24,5 +26,16 @@ func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) Co
}
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
return []string{}, nil
conn, err := g.client.GetConn(config.Config.RpcRegisterName.OpenImConversationName)
if err != nil {
return nil, err
}
client := conversation.NewConversationClient(conn)
resp, err := client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{
GroupID: groupID,
})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
}

View File

@ -100,5 +100,5 @@ func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs(ctx context.Context, grou
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userIDs", userIDs)
}()
return userIDs, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("group_id = ? and recv_msg_opt", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "")
return userIDs, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "")
}

View File

@ -63,10 +63,10 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
log.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
return -1, -1, errors.New("key or value == 0")
}
a, b, c := p.producer.SendMessage(kMsg)
partition, offset, err := p.producer.SendMessage(kMsg)
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
if c == nil {
if err == nil {
prome.PromeInc(prome.SendMsgCounter)
}
return a, b, utils.Wrap(c, "")
return partition, offset, utils.Wrap(err, "")
}

View File

@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/peer"
)
func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
func UnaryServerInterceptorPrometheus(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
remote, _ := peer.FromContext(ctx)
remoteAddr := remote.Addr.String()

View File

@ -9,37 +9,4 @@ type SvcDiscoveryRegistry interface {
UnRegister() error
GetConns(serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error)
GetConn(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
//RegisterConf(conf []byte) error
//LoadConf() ([]byte, error)
}
//func registerConf(key, conf string) {
// etcdAddr := strings.Join(config.Config.Etcd.EtcdAddr, ",")
// cli, err := clientv3.New(clientv3.Config{
// Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second})
//
// if err != nil {
// panic(err.Error())
// }
// //lease
// if _, err := cli.Put(context.Background(), key, conf); err != nil {
// fmt.Println("panic, params: ")
// panic(err.Error())
// }
//}
//
//func RegisterConf() {
// bytes, err := yaml.Marshal(config.Config)
// if err != nil {
// panic(err.Error())
// }
// secretMD5 := utils.Md5(config.Config.Etcd.Secret)
// confBytes, err := utils.AesEncrypt(bytes, []byte(secretMD5[0:16]))
// if err != nil {
// panic(err.Error())
// }
// fmt.Println("start register", secretMD5, getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName))
// registerConf(getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName), string(confBytes))
// fmt.Println("etcd register conf ok")
//}

View File

@ -1,16 +0,0 @@
package discoveryregistry
import "google.golang.org/grpc"
type Robin struct {
next int
}
func (r *Robin) Robin(slice []*grpc.ClientConn) int {
index := r.next
r.next += 1
if r.next > len(slice)-1 {
r.next = 0
}
return index
}