From 4fc00109ef58ff05e383c6db728d110f19ce1a38 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 23 Feb 2023 17:28:57 +0800 Subject: [PATCH 1/2] push --- cmd/push/main.go | 4 ++- internal/crontask/clear_msg.go | 8 +++-- .../msgtransfer/online_history_msg_handler.go | 6 ++-- .../online_msg_to_mongo_handler.go | 10 +++--- internal/push/getui/push.go | 5 +++ internal/push/init.go | 10 ++++-- internal/push/push_handler.go | 14 ++++++-- internal/push/push_rpc_server.go | 15 ++++----- internal/push/push_to_client.go | 1 - internal/push/tpns.go | 31 ----------------- internal/rpc/friend/friend.go | 2 +- internal/rpc/group/group.go | 10 +++--- internal/rpc/msg/send_pull.go | 10 +++--- pkg/common/db/cache/redis.go | 10 +++--- pkg/common/db/controller/msg.go | 10 ++---- pkg/common/db/localcache/conversation.go | 21 +++++++++--- pkg/common/db/relation/conversation_model.go | 2 +- pkg/common/kafka/producer.go | 6 ++-- pkg/common/prome/grpc.go | 2 +- pkg/discoveryregistry/discovery_register.go | 33 ------------------- pkg/discoveryregistry/startegy.go | 16 --------- 21 files changed, 87 insertions(+), 139 deletions(-) delete mode 100644 internal/push/tpns.go delete mode 100644 pkg/discoveryregistry/startegy.go diff --git a/cmd/push/main.go b/cmd/push/main.go index 66cbd98c5..40e774608 100644 --- a/cmd/push/main.go +++ b/cmd/push/main.go @@ -20,7 +20,9 @@ func main() { log.NewPrivateLog(constant.LogFileName) fmt.Println("start push rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") pusher := push.Push{} - pusher.Init(*rpcPort) + if err := pusher.Init(*rpcPort); err != nil { + panic(err.Error()) + } pusher.Run(*prometheusPort) wg.Wait() } diff --git a/internal/crontask/clear_msg.go b/internal/crontask/clear_msg.go index 051161143..d6e30f49b 100644 --- a/internal/crontask/clear_msg.go +++ b/internal/crontask/clear_msg.go @@ -12,7 +12,7 @@ import ( ) type ClearMsgTool struct { - msgInterface controller.MsgInterface + msgInterface controller.MsgDatabase userInterface controller.UserDatabase groupInterface controller.GroupDatabase } @@ -73,7 +73,11 @@ func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDLis log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID) continue } - c.FixGroupUserSeq(ctx, userIDs, groupID) + //for _, userID := range userIDs { + // c.msgInterface.getgroup + // c.FixGroupUserSeq(ctx, userID, groupID, ) + // + //} c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) } } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b533afd46..03d0173ff 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -56,8 +56,8 @@ type OnlineHistoryRedisConsumerHandler struct { producerToModify *kafka.Producer producerToMongo *kafka.Producer - msgInterface controller.MsgInterface - cache cache.Cache + msgDatabase controller.MsgDatabase + cache cache.Cache } func (och *OnlineHistoryRedisConsumerHandler) Init() { @@ -113,7 +113,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { } log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) if len(storageMsgList) > 0 { - lastSeq, err := och.msgInterface.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList) + lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList) if err != nil { och.singleMsgFailedCountMutex.Lock() och.singleMsgFailedCount += uint64(len(storageMsgList)) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 9d1202321..974dd0310 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -18,7 +18,7 @@ import ( type OnlineHistoryMongoConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup - msgInterface controller.MsgInterface + msgDatabase controller.MsgDatabase cache cache.Cache } @@ -39,12 +39,12 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con ctx := context.Background() tracelog.SetOperationID(ctx, msgFromMQ.TriggerID) //err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq) - err = mc.msgInterface.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq) + err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq) if err != nil { log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID) } //err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID()) - err = mc.msgInterface.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList) + err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList) if err != nil { log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID) } @@ -62,8 +62,8 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) continue } - if totalUnExistSeqs, err := mc.msgInterface.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.SeqList); err != nil { - log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs) + if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil { + log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs) } } diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 06b25252d..5827a2447 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -9,6 +9,7 @@ import ( "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils/splitter" "github.com/go-redis/redis/v8" + "sync" "Open_IM/pkg/utils" "context" @@ -64,13 +65,17 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri maxNum := 999 if len(userIDs) > maxNum { s := splitter.NewSplitter(maxNum, userIDs) + wg := sync.WaitGroup{} + wg.Add(len(s.GetSplitResult())) for i, v := range s.GetSplitResult() { go func(index int, userIDs []string) { + defer wg.Done() if err = g.batchPush(ctx, token, userIDs, pushReq); err != nil { log.NewError(tracelog.GetOperationID(ctx), "batchPush failed", i, token, pushReq) } }(i, v.Item) } + wg.Wait() } else { err = g.batchPush(ctx, token, userIDs, pushReq) } diff --git a/internal/push/init.go b/internal/push/init.go index 51fcce428..60b9dd612 100644 --- a/internal/push/init.go +++ b/internal/push/init.go @@ -25,9 +25,12 @@ type Push struct { successCount uint64 } -func (p *Push) Init(rpcPort int) { - var cacheInterface cache.Cache - +func (p *Push) Init(rpcPort int) error { + redisClient, err := cache.NewRedis() + if err != nil { + return err + } + var cacheInterface cache.Cache = redisClient p.rpcServer.Init(rpcPort, cacheInterface) p.pushCh.Init() statistics.NewStatistics(&p.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) @@ -40,6 +43,7 @@ func (p *Push) Init(rpcPort int) { if config.Config.Push.Fcm.Enable { p.offlinePusher = fcm.NewClient(cacheInterface) } + return nil } func (p *Push) initPrometheus() { diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 13dad5704..b49466b01 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -11,15 +11,18 @@ import ( "Open_IM/pkg/common/constant" kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/tracelog" pbChat "Open_IM/pkg/proto/msg" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" + "context" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) type ConsumerHandler struct { pushConsumerGroup *kfk.MConsumerGroup + pusher Pusher } func (c *ConsumerHandler) Init() { @@ -27,6 +30,7 @@ func (c *ConsumerHandler) Init() { OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) } + func (c *ConsumerHandler) handleMs2PsChat(msg []byte) { log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg)) msgFromMQ := pbChat.PushMsgDataToMQ{} @@ -43,11 +47,17 @@ func (c *ConsumerHandler) handleMs2PsChat(msg []byte) { if nowSec-sec > 10 { return } + ctx := context.Background() + tracelog.SetOperationID(ctx, "") + var err error switch msgFromMQ.MsgData.SessionType { case constant.SuperGroupChatType: - MsgToSuperGroupUser(pbData) + err = c.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData) default: - MsgToUser(pbData) + err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData) + } + if err != nil { + log.NewError("", "push failed", *pbData) } } func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 6efcc62dc..a17df5291 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -21,16 +21,13 @@ import ( type RPCServer struct { rpcPort int rpcRegisterName string - etcdSchema string - etcdAddr []string - push controller.PushInterface + pushInterface controller.PushInterface + pusher Pusher } func (r *RPCServer) Init(rpcPort int, cache cache.Cache) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName - r.etcdSchema = config.Config.Etcd.EtcdSchema - r.etcdAddr = config.Config.Etcd.EtcdAddr } func (r *RPCServer) run() { @@ -84,13 +81,13 @@ func (r *RPCServer) run() { func (r *RPCServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) { switch pbData.MsgData.SessionType { case constant.SuperGroupChatType: - MsgToSuperGroupUser(pbData) + err = r.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData) default: - MsgToUser(pbData) + err = r.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData) } - return &pbPush.PushMsgResp{}, nil + return &pbPush.PushMsgResp{}, err } func (r *RPCServer) DelUserPushToken(ctx context.Context, req *pbPush.DelUserPushTokenReq) (resp *pbPush.DelUserPushTokenResp, err error) { - return &pbPush.DelUserPushTokenResp{}, r.push.DelFcmToken(ctx, req.UserID, int(req.PlatformID)) + return &pbPush.DelUserPushTokenResp{}, r.pushInterface.DelFcmToken(ctx, req.UserID, int(req.PlatformID)) } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 3500ed8cc..5e72f490b 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -79,7 +79,6 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat } err = p.OfflinePushMsg(ctx, userID, msg, userIDs) if err != nil { - log.NewError(operationID, "OfflinePushMsg failed", userID) return err } } diff --git a/internal/push/tpns.go b/internal/push/tpns.go deleted file mode 100644 index de8665781..000000000 --- a/internal/push/tpns.go +++ /dev/null @@ -1,31 +0,0 @@ -package push - -import ( - "Open_IM/internal/push/sdk/tpns-server-sdk-go/go/auth" - "Open_IM/pkg/common/config" -) - -var badgeType = -2 -var iosAcceptId = auth.Auther{AccessID: config.Config.Push.Tpns.Ios.AccessID, SecretKey: config.Config.Push.Tpns.Ios.SecretKey} - -func IOSAccountListPush(accounts []string, title, content, jsonCustomContent string) { - var iosMessage = tpns.Message{ - Title: title, - Content: content, - IOS: &tpns.IOSParams{ - Aps: &tpns.Aps{ - BadgeType: &badgeType, - Sound: "default", - Category: "INVITE_CATEGORY", - }, - CustomContent: jsonCustomContent, - //CustomContent: `"{"key\":\"value\"}"`, - }, - } - pushReq, reqBody, err := req.NewListAccountPush(accounts, iosMessage) - if err != nil { - return - } - iosAcceptId.Auth(pushReq, auth.UseSignAuthored, iosAcceptId, reqBody) - common.PushAndGetResult(pushReq) -} diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 3767dd076..e7be6221f 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -51,7 +51,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil { return nil, err } - if err := CallbackBeforeAddFriend(ctx, req); err != nil { + if err := CallbackBeforeAddFriend(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } if req.ToUserID == req.FromUserID { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 17eb81180..b0d8db256 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -142,7 +142,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR if err != nil { return nil, err } - if err := CallbackBeforeCreateGroup(ctx, req); err != nil { + if err := CallbackBeforeCreateGroup(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } var groupMembers []*relationTb.GroupMemberModel @@ -158,7 +158,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR groupMember.OperatorUserID = tracelog.GetOpUserID(ctx) groupMember.JoinSource = constant.JoinByInvitation groupMember.InviterUserID = tracelog.GetOpUserID(ctx) - if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil { + if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != constant.ErrCallbackContinue { return err } groupMembers = append(groupMembers, groupMember) @@ -318,7 +318,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite member.OperatorUserID = opUserID member.InviterUserID = opUserID member.JoinSource = constant.JoinByInvitation - if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil { + if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != constant.ErrCallbackContinue { return nil, err } groupMembers = append(groupMembers, member) @@ -601,7 +601,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup OperatorUserID: tracelog.GetOpUserID(ctx), Ex: groupRequest.Ex, } - if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil { + if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != constant.ErrCallbackContinue { return nil, err } } @@ -645,7 +645,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) groupMember.OperatorUserID = tracelog.GetOpUserID(ctx) groupMember.JoinSource = constant.JoinByInvitation groupMember.InviterUserID = tracelog.GetOpUserID(ctx) - if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil { + if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != constant.ErrCallbackContinue { return nil, err } if err := s.GroupDatabase.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil { diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 06b3b8c01..3f1be0d04 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -16,7 +16,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR resp = &msg.SendMsgResp{} promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) // callback - if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil { + if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } @@ -61,7 +61,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter) - if err = CallbackBeforeSendSingleMsg(ctx, req); err != nil { + if err = CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } _, err = m.messageVerification(ctx, req) @@ -86,7 +86,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) } } err = CallbackAfterSendSingleMsg(ctx, req) - if err != nil { + if err != nil && err != constant.ErrCallbackContinue { return nil, err } promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) @@ -100,7 +100,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( // callback promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter) err = CallbackBeforeSendGroupMsg(ctx, req) - if err != nil { + if err != nil && err != constant.ErrCallbackContinue { return nil, err } @@ -235,7 +235,7 @@ func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg return nil, constant.ErrMessageHasReadDisable.Wrap() } m.encapsulateMsgData(req.MsgData) - if err := CallbackMsgModify(ctx, req); err != nil { + if err := CallbackMsgModify(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } switch req.MsgData.SessionType { diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 05129f93a..6448ff024 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -59,7 +59,7 @@ type Cache interface { SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error - GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error CleanUpOneUserAllMsg(ctx context.Context, userID string) error @@ -213,13 +213,13 @@ func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq } // Store userid and platform class to redis -func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platform string, token string, flag int) error { - key := uidPidToken + userID + ":" + platform +func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) return r.rdb.HSet(context.Background(), key, token, flag).Err() } //key:userID+platform-> -func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error) { +func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID int) (map[string]int, error) { key := uidPidToken + userID + ":" + platformID m, err := r.rdb.HGetAll(context.Background(), key).Result() mm := make(map[string]int) @@ -256,7 +256,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl return r.rdb.HDel(context.Background(), key, fields...).Err() } -func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) { +func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) { for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index df306f3ca..5c584e18b 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -161,7 +161,6 @@ type MsgDatabaseInterface interface { GetUserMinSeq(ctx context.Context, userID string) (int64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) - GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) } type MsgDatabase struct { mgo unRelationTb.MsgDocModelInterface @@ -249,11 +248,6 @@ func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int6 panic("implement me") } -func (db *MsgDatabase) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) { - //TODO implement me - panic("implement me") -} - func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { return &MsgDatabase{} } @@ -530,7 +524,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ } func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs) + successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) if err != nil { if err != redis.Nil { prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) @@ -551,7 +545,7 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i } func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs) + successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) if err != nil { if err != redis.Nil { prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index 2fba4a7cf..592f1e3f2 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -1,7 +1,9 @@ package localcache import ( - discoveryRegistry "Open_IM/pkg/discoveryregistry" + "Open_IM/pkg/common/config" + "Open_IM/pkg/discoveryregistry" + "Open_IM/pkg/proto/conversation" "context" "sync" ) @@ -13,10 +15,10 @@ type ConversationLocalCacheInterface interface { type ConversationLocalCache struct { lock sync.Mutex SuperGroupRecvMsgNotNotifyUserIDs map[string][]string - client discoveryRegistry.SvcDiscoveryRegistry + client discoveryregistry.SvcDiscoveryRegistry } -func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) ConversationLocalCache { +func NewConversationLocalCache(client discoveryregistry.SvcDiscoveryRegistry) ConversationLocalCache { return ConversationLocalCache{ SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0), client: client, @@ -24,5 +26,16 @@ func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) Co } func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - return []string{}, nil + conn, err := g.client.GetConn(config.Config.RpcRegisterName.OpenImConversationName) + if err != nil { + return nil, err + } + client := conversation.NewConversationClient(conn) + resp, err := client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{ + GroupID: groupID, + }) + if err != nil { + return nil, err + } + return resp.UserIDs, nil } diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index 14d72ea03..c360fa393 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -100,5 +100,5 @@ func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs(ctx context.Context, grou defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userIDs", userIDs) }() - return userIDs, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("group_id = ? and recv_msg_opt", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "") + return userIDs, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "") } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index b02fe3ef3..61a137458 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -63,10 +63,10 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) log.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) return -1, -1, errors.New("key or value == 0") } - a, b, c := p.producer.SendMessage(kMsg) + partition, offset, err := p.producer.SendMessage(kMsg) log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) - if c == nil { + if err == nil { prome.PromeInc(prome.SendMsgCounter) } - return a, b, utils.Wrap(c, "") + return partition, offset, utils.Wrap(err, "") } diff --git a/pkg/common/prome/grpc.go b/pkg/common/prome/grpc.go index fe0a633fa..8542e7e9f 100644 --- a/pkg/common/prome/grpc.go +++ b/pkg/common/prome/grpc.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc/peer" ) -func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { +func UnaryServerInterceptorPrometheus(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { remote, _ := peer.FromContext(ctx) remoteAddr := remote.Addr.String() diff --git a/pkg/discoveryregistry/discovery_register.go b/pkg/discoveryregistry/discovery_register.go index 72eca2a67..68b6be1a0 100644 --- a/pkg/discoveryregistry/discovery_register.go +++ b/pkg/discoveryregistry/discovery_register.go @@ -9,37 +9,4 @@ type SvcDiscoveryRegistry interface { UnRegister() error GetConns(serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) GetConn(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) - - //RegisterConf(conf []byte) error - //LoadConf() ([]byte, error) } - -//func registerConf(key, conf string) { -// etcdAddr := strings.Join(config.Config.Etcd.EtcdAddr, ",") -// cli, err := clientv3.New(clientv3.Config{ -// Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second}) -// -// if err != nil { -// panic(err.Error()) -// } -// //lease -// if _, err := cli.Put(context.Background(), key, conf); err != nil { -// fmt.Println("panic, params: ") -// panic(err.Error()) -// } -//} -// -//func RegisterConf() { -// bytes, err := yaml.Marshal(config.Config) -// if err != nil { -// panic(err.Error()) -// } -// secretMD5 := utils.Md5(config.Config.Etcd.Secret) -// confBytes, err := utils.AesEncrypt(bytes, []byte(secretMD5[0:16])) -// if err != nil { -// panic(err.Error()) -// } -// fmt.Println("start register", secretMD5, getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName)) -// registerConf(getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName), string(confBytes)) -// fmt.Println("etcd register conf ok") -//} diff --git a/pkg/discoveryregistry/startegy.go b/pkg/discoveryregistry/startegy.go deleted file mode 100644 index f54d7d80d..000000000 --- a/pkg/discoveryregistry/startegy.go +++ /dev/null @@ -1,16 +0,0 @@ -package discoveryregistry - -import "google.golang.org/grpc" - -type Robin struct { - next int -} - -func (r *Robin) Robin(slice []*grpc.ClientConn) int { - index := r.next - r.next += 1 - if r.next > len(slice)-1 { - r.next = 0 - } - return index -} From 9c76722c900a35bd88d54999d1b64f7381022cc6 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 23 Feb 2023 18:20:45 +0800 Subject: [PATCH 2/2] push --- cmd/cmdutils/main.go | 17 ++++ cmd/crontask/main.go | 12 ++- internal/crontask/cron_task.go | 43 --------- internal/{crontask => task}/clear_msg.go | 89 ++++++++++++++++--- internal/{crontask => task}/clear_msg_test.go | 2 +- internal/task/cron_task.go | 56 ++++++++++++ pkg/common/db/cache/redis.go | 26 +++--- pkg/common/db/controller/msg.go | 86 +++++++++--------- 8 files changed, 211 insertions(+), 120 deletions(-) create mode 100644 cmd/cmdutils/main.go delete mode 100644 internal/crontask/cron_task.go rename internal/{crontask => task}/clear_msg.go (58%) rename internal/{crontask => task}/clear_msg_test.go (99%) create mode 100644 internal/task/cron_task.go diff --git a/cmd/cmdutils/main.go b/cmd/cmdutils/main.go new file mode 100644 index 000000000..ded252795 --- /dev/null +++ b/cmd/cmdutils/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "Open_IM/internal/task" + "flag" + "fmt" + "time" +) + +func main() { + var userID = flag.String("userID", "", "userID to clear msg and reset seq") + var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq") + var fixAllSeq = flag.Bool("fixAllSeq", false, "fix seq") + flag.Parse() + fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID) + task.FixSeq(*userID, *workingGroupID, *fixAllSeq) +} diff --git a/cmd/crontask/main.go b/cmd/crontask/main.go index 6dcc33ddc..c4ca2aab9 100644 --- a/cmd/crontask/main.go +++ b/cmd/crontask/main.go @@ -1,16 +1,14 @@ package main import ( - "Open_IM/internal/crontask" - "flag" + "Open_IM/internal/task" "fmt" "time" ) func main() { - var userID = flag.String("userID", "", "userID to clear msg and reset seq") - var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq") - flag.Parse() - fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID) - cronTask.StartCronTask(*userID, *workingGroupID) + fmt.Println(time.Now(), "start cronTask") + if err := task.StartCronTask(); err != nil { + panic(err.Error()) + } } diff --git a/internal/crontask/cron_task.go b/internal/crontask/cron_task.go deleted file mode 100644 index be1094661..000000000 --- a/internal/crontask/cron_task.go +++ /dev/null @@ -1,43 +0,0 @@ -package cronTask - -import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/log" - "Open_IM/pkg/utils" - "fmt" - "time" - - "github.com/robfig/cron/v3" -) - -const cronTaskOperationID = "cronTaskOperationID-" -const moduleName = "cron" - -func StartCronTask(userID, workingGroupID string) { - log.NewPrivateLog(moduleName) - log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) - fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) - if userID != "" { - operationID := getCronTaskOperationID() - ClearUsersMsg(operationID, []string{userID}) - } - if workingGroupID != "" { - operationID := getCronTaskOperationID() - ClearSuperGroupMsg(operationID, []string{workingGroupID}) - } - if userID != "" || workingGroupID != "" { - fmt.Println("clear msg finished") - return - } - c := cron.New() - _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll) - if err != nil { - fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) - panic(err) - } - c.Start() - fmt.Println("start cron task success") - for { - time.Sleep(10 * time.Second) - } -} diff --git a/internal/crontask/clear_msg.go b/internal/task/clear_msg.go similarity index 58% rename from internal/crontask/clear_msg.go rename to internal/task/clear_msg.go index d6e30f49b..7dfdb0a7b 100644 --- a/internal/crontask/clear_msg.go +++ b/internal/task/clear_msg.go @@ -1,4 +1,4 @@ -package cronTask +package task import ( "Open_IM/pkg/common/config" @@ -8,20 +8,22 @@ import ( "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" + "fmt" + "github.com/go-redis/redis/v8" "math" ) -type ClearMsgTool struct { +type msgTool struct { msgInterface controller.MsgDatabase userInterface controller.UserDatabase groupInterface controller.GroupDatabase } -func (c *ClearMsgTool) getCronTaskOperationID() string { +func (c *msgTool) getCronTaskOperationID() string { return cronTaskOperationID + utils.OperationIDGenerator() } -func (c *ClearMsgTool) ClearAll() { +func (c *msgTool) ClearAll() { operationID := c.getCronTaskOperationID() ctx := context.Background() tracelog.SetOperationID(ctx, operationID) @@ -43,7 +45,7 @@ func (c *ClearMsgTool) ClearAll() { log.NewInfo(operationID, "============================ start del cron finished ============================") } -func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { +func (c *msgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { for _, userID := range userIDList { if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID) @@ -58,7 +60,7 @@ func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { } } -func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) { +func (c *msgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) { for _, groupID := range workingGroupIDList { userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID) if err != nil { @@ -73,16 +75,20 @@ func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDLis log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID) continue } - //for _, userID := range userIDs { - // c.msgInterface.getgroup - // c.FixGroupUserSeq(ctx, userID, groupID, ) - // - //} + for _, userID := range userIDs { + minSeqCache, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID) + if err != nil { + log.NewError(tracelog.GetOperationID(ctx), "GetGroupUserMinSeq failed", groupID, userID) + continue + } + c.FixGroupUserSeq(ctx, userID, groupID, minSeqCache, maxSeqCache) + + } c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) } } -func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) { +func (c *msgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) { if minSeqCache > maxSeqCache { if err := c.msgInterface.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil { log.NewError(tracelog.GetOperationID(ctx), "SetUserMinSeq failed", userID, minSeqCache, maxSeqCache) @@ -92,7 +98,7 @@ func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCach } } -func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) { +func (c *msgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) { if minSeqCache > maxSeqCache { if err := c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { log.NewError(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq failed", userID, minSeqCache, maxSeqCache) @@ -102,8 +108,63 @@ func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, group } } -func (c *ClearMsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) { +func (c *msgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) { if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType) } } + +func (c *msgTool) FixAllSeq(ctx context.Context) { + userIDs, err := c.userInterface.GetAllUserID(ctx) + if err != nil { + panic(err.Error()) + } + for _, userID := range userIDs { + userCurrentMinSeq, err := c.msgInterface.GetUserMinSeq(ctx, userID) + if err != nil && err != redis.Nil { + continue + } + userCurrentMaxSeq, err := c.msgInterface.GetUserMaxSeq(ctx, userID) + if err != nil && err != redis.Nil { + continue + } + if userCurrentMinSeq > userCurrentMaxSeq { + if err = c.msgInterface.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil { + fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq) + } + fmt.Println("fix", userID, userCurrentMaxSeq) + } + } + fmt.Println("fix users seq success") + + groupIDs, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup) + if err != nil { + panic(err.Error()) + } + for _, groupID := range groupIDs { + maxSeq, err := c.msgInterface.GetGroupMaxSeq(ctx, groupID) + if err != nil { + fmt.Println("GetGroupMaxSeq failed", groupID) + continue + } + userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID) + if err != nil { + fmt.Println("get groupID", groupID, "failed, try again later") + continue + } + for _, userID := range userIDs { + userMinSeq, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID) + if err != nil && err != redis.Nil { + fmt.Println("GetGroupUserMinSeq failed", groupID, userID) + continue + } + if userMinSeq > maxSeq { + if err = c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil { + fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq) + } + fmt.Println("fix", groupID, userID, maxSeq, userMinSeq) + } + } + } + fmt.Println("fix all seq finished") +} diff --git a/internal/crontask/clear_msg_test.go b/internal/task/clear_msg_test.go similarity index 99% rename from internal/crontask/clear_msg_test.go rename to internal/task/clear_msg_test.go index be0be8197..fc91c2e82 100644 --- a/internal/crontask/clear_msg_test.go +++ b/internal/task/clear_msg_test.go @@ -1,4 +1,4 @@ -package cronTask +package task import ( "Open_IM/pkg/common/constant" diff --git a/internal/task/cron_task.go b/internal/task/cron_task.go new file mode 100644 index 000000000..0e2f4f6d8 --- /dev/null +++ b/internal/task/cron_task.go @@ -0,0 +1,56 @@ +package task + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/log" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/utils" + "context" + "fmt" + "time" + + "github.com/robfig/cron/v3" +) + +const cronTaskOperationID = "cronTaskOperationID-" +const moduleName = "cron" + +func StartCronTask() error { + log.NewPrivateLog(moduleName) + log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) + fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) + clearCronTask := msgTool{} + ctx := context.Background() + operationID := clearCronTask.getCronTaskOperationID() + tracelog.SetOperationID(ctx, operationID) + c := cron.New() + _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearCronTask.ClearAll) + if err != nil { + fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) + return err + } + c.Start() + fmt.Println("start cron task success") + for { + time.Sleep(10 * time.Second) + } +} + +func FixSeq(userID, workingGroupID string, fixAllSeq bool) { + log.NewPrivateLog(moduleName) + log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) + clearCronTask := msgTool{} + ctx := context.Background() + operationID := clearCronTask.getCronTaskOperationID() + tracelog.SetOperationID(ctx, operationID) + if userID != "" { + clearCronTask.ClearUsersMsg(ctx, []string{userID}) + } + if workingGroupID != "" { + clearCronTask.ClearSuperGroupMsg(ctx, []string{workingGroupID}) + } + if fixAllSeq { + clearCronTask.FixAllSeq(ctx) + } + fmt.Println("fix seq finished") +} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 6448ff024..d6d3bc110 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -256,7 +256,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl return r.rdb.HDel(context.Background(), key, fields...).Err() } -func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) { +func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err2 error) { for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) @@ -265,25 +265,25 @@ func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqLi if err != redis.Nil { err2 = err } - failedSeqList = append(failedSeqList, v) + failedSeqs = append(failedSeqs, v) } else { msg := sdkws.MsgData{} err = jsonpb.UnmarshalString(result, &msg) if err != nil { err2 = err - failedSeqList = append(failedSeqList, v) + failedSeqs = append(failedSeqs, v) } else { - seqMsg = append(seqMsg, &msg) + seqMsgs = append(seqMsgs, &msg) } } } - return seqMsg, failedSeqList, err2 + return seqMsgs, failedSeqs, err2 } -func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ, uid string) (int, error) { +func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ, uid string) (int, error) { pipe := r.rdb.Pipeline() - var failedList []pbChat.MsgDataToMQ - for _, msg := range msgList { + var failedMsgs []pbChat.MsgDataToMQ + for _, msg := range msgs { key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) s, err := utils.Pb2String(msg.MsgData) if err != nil { @@ -292,17 +292,17 @@ func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgL err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() //err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() if err != nil { - failedList = append(failedList, *msg) + failedMsgs = append(failedMsgs, *msg) } } - if len(failedList) != 0 { - return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList)) + if len(failedMsgs) != 0 { + return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList)) } _, err := pipe.Exec(ctx) return 0, err } -func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error { - for _, msg := range msgList { +func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ) error { + for _, msg := range msgs { key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) err := r.rdb.Del(ctx, key).Err() if err != nil { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 5c584e18b..997aca61c 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -114,7 +114,7 @@ import ( // return m.database.SetUserMinSeq(ctx, userID, minSeq) //} -type MsgDatabaseInterface interface { +type MsgDatabase interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error // 刪除redis中消息缓存 @@ -140,6 +140,8 @@ type MsgDatabaseInterface interface { GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) // 设置群用户最小seq 直接调用cache SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) + // + GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) // 设置用户最小seq 直接调用cache SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) @@ -162,97 +164,97 @@ type MsgDatabaseInterface interface { GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) } -type MsgDatabase struct { +type msgDatabase struct { mgo unRelationTb.MsgDocModelInterface cache cache.Cache msg unRelationTb.MsgDocModel } -func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { +func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { +func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { +func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { +func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { +func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { +func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { +func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { +func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { +func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error { +func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) { +func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { +func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { +func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { +func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { //TODO implement me panic("implement me") } -func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { - return &MsgDatabase{} +func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase { + return &msgDatabase{} } -func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { +func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { //newTime := utils.GetCurrentTimestampByMill() if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { return errors.New("too large") @@ -336,11 +338,11 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, return nil } -func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { return db.cache.DeleteMessageFromCache(ctx, userID, msgs) } -func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { +func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { //newTime := utils.GetCurrentTimestampByMill() lenList := len(msgList) if int64(lenList) > db.msg.GetSingleGocMsgNum() { @@ -392,7 +394,7 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin return lastMaxSeq, utils.Wrap(err, "") } -func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { sortkeys.Int64s(seqs) docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) lock := sync.Mutex{} @@ -413,7 +415,7 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i return totalUnExistSeqs, nil } -func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { return nil, err @@ -426,7 +428,7 @@ func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s return unExistSeqs, nil } -func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { +func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { doc, err := db.mgo.FindOneByDocID(ctx, docID) if err != nil { return nil, nil, nil, err @@ -457,7 +459,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s return seqMsgs, indexes, unExistSeqs, nil } -func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID) if err != nil { return nil, err @@ -465,7 +467,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return nil, err @@ -473,7 +475,7 @@ func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { msgPb = &sdkws.MsgData{} err = proto.Unmarshal(msgInfo.Msg, msgPb) if err != nil { @@ -482,7 +484,7 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb * return msgPb, nil } -func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { var hasSeqs []int64 singleCount := 0 m := db.msg.GetDocIDSeqsMap(sourceID, seqs) @@ -523,7 +525,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ return seqMsg, nil } -func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) if err != nil { if err != redis.Nil { @@ -544,7 +546,7 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i return successMsgs, nil } -func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) if err != nil { if err != redis.Nil { @@ -565,7 +567,7 @@ func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin return successMsgs, nil } -func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { +func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) if err != nil { return err @@ -574,7 +576,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error return utils.Wrap(err, "") } -func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { +func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -602,7 +604,7 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, return nil } -func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { +func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -628,7 +630,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // seq 70 // set minSeq 21 // recursion 删除list并且返回设置的最小seq -func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { +func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index) if err != nil || msgs.DocID == "" { @@ -690,10 +692,10 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, //log.NewDebug(operationID, sourceID, "continue to", delStruct) // 继续递归 index+1 seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime) - return seq, utils.Wrap(err, "deleteMsg failed") + return seq, err } -func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { +func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID) if err != nil { return 0, 0, 0, 0, err @@ -710,7 +712,7 @@ func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user return } -func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { +func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID) if err != nil { return 0, 0, 0, err @@ -722,7 +724,7 @@ func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context return } -func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { +func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return 0, 0, err @@ -744,10 +746,10 @@ func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) ( return } -func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) } -func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { +func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { return db.cache.SetUserMinSeq(ctx, userID, minSeq) }