From c7e684211d7d346711e742ab615c3c3f21fad61a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 8 Jun 2023 11:54:52 +0800 Subject: [PATCH 1/7] feat: log add prefixName --- pkg/common/cmd/root.go | 2 +- pkg/common/log/zap.go | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index d1cf3c362..be04ac79a 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -26,7 +26,7 @@ func NewRootCmd(name string) (rootCmd *RootCmd) { if err := rootCmd.getConfFromCmdAndInit(cmd); err != nil { panic(err) } - if err := log.InitFromConfig(name, config.Config.Log.RemainLogLevel, config.Config.Log.IsStdout, config.Config.Log.IsJson, config.Config.Log.StorageLocation, config.Config.Log.RemainRotationCount); err != nil { + if err := log.InitFromConfig("OpenIM.log.all", name, config.Config.Log.RemainLogLevel, config.Config.Log.IsStdout, config.Config.Log.IsJson, config.Config.Log.StorageLocation, config.Config.Log.RemainRotationCount); err != nil { panic(err) } return nil diff --git a/pkg/common/log/zap.go b/pkg/common/log/zap.go index 1d1d5fbc8..9f6fe633a 100644 --- a/pkg/common/log/zap.go +++ b/pkg/common/log/zap.go @@ -32,14 +32,14 @@ var ( ) // InitFromConfig initializes a Zap-based logger -func InitFromConfig(name string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) error { - l, err := NewZapLogger(name, logLevel, isStdout, isJson, logLocation, rotateCount) +func InitFromConfig(loggerPrefixName, moduleName string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) error { + l, err := NewZapLogger(loggerPrefixName, moduleName, logLevel, isStdout, isJson, logLocation, rotateCount) if err != nil { return err } pkgLogger = l.WithCallDepth(2) if isJson { - pkgLogger = pkgLogger.WithName(name) + pkgLogger = pkgLogger.WithName(moduleName) } return nil } @@ -61,12 +61,13 @@ func ZError(ctx context.Context, msg string, err error, keysAndValues ...interfa } type ZapLogger struct { - zap *zap.SugaredLogger - level zapcore.Level - loggerName string + zap *zap.SugaredLogger + level zapcore.Level + loggerName string + loggerPrefixName string } -func NewZapLogger(loggerName string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) (*ZapLogger, error) { +func NewZapLogger(loggerPrefixName, loggerName string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) (*ZapLogger, error) { zapConfig := zap.Config{ Level: zap.NewAtomicLevelAt(logLevelMap[logLevel]), // EncoderConfig: zap.NewProductionEncoderConfig(), @@ -81,7 +82,7 @@ func NewZapLogger(loggerName string, logLevel int, isStdout bool, isJson bool, l // if isStdout { // zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stdout", "stderr") // } - zl := &ZapLogger{level: logLevelMap[logLevel], loggerName: loggerName} + zl := &ZapLogger{level: logLevelMap[logLevel], loggerName: loggerName, loggerPrefixName: loggerPrefixName} opts, err := zl.cores(isStdout, isJson, logLocation, rotateCount) if err != nil { return nil, err @@ -158,7 +159,7 @@ func (l *ZapLogger) timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) } func (l *ZapLogger) getWriter(logLocation string, rorateCount uint) (zapcore.WriteSyncer, error) { - logf, err := rotatelogs.New(logLocation+sp+"OpenIM.log.all"+".%Y-%m-%d", + logf, err := rotatelogs.New(logLocation+sp+l.loggerPrefixName+".%Y-%m-%d", rotatelogs.WithRotationCount(rorateCount), rotatelogs.WithRotationTime(time.Duration(config.Config.Log.RotationTime)*time.Hour), ) From 51ee11f7d85d6eadc6e2fee24c019c6d370d9ac1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 15:16:40 +0800 Subject: [PATCH 2/7] del msg --- .../online_msg_to_mongo_handler.go | 7 +- internal/rpc/group/group.go | 6 +- internal/rpc/msg/revoke.go | 2 +- pkg/common/db/cache/msg.go | 156 +++++++++++------- pkg/common/db/controller/msg.go | 45 ++++- 5 files changed, 145 insertions(+), 71 deletions(-) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index e2b2557b4..5d052fac4 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -44,10 +44,15 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont if err != nil { log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) } - err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData) + var seqs []int64 + for _, msg := range msgFromMQ.MsgData { + seqs = append(seqs, msg.Seq) + } + err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) if err != nil { log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) } + mc.msgDatabase.DelUserDeleteMsgsList(ctx, msgFromMQ.ConversationID, seqs) } func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index fb19ed68e..d39e00e1e 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -624,15 +624,17 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG } func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) { - resp := &pbGroup.GetGroupApplicationListResp{} + pageNumber, showNumber := utils.GetPage(req.Pagination) + groupIDs, err := s.GroupDatabase.FindUserManagedGroupID(ctx, req.FromUserID) if err != nil { return nil, err } + resp := &pbGroup.GetGroupApplicationListResp{} if len(groupIDs) == 0 { return resp, nil } - total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, req.Pagination.PageNumber, req.Pagination.ShowNumber) + total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, pageNumber, showNumber) if err != nil { return nil, err } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 33856a109..bc6a7a551 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -16,7 +16,7 @@ import ( ) func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { - defer log.ZInfo(ctx, "RevokeMsg return line") + defer log.ZDebug(ctx, "RevokeMsg return line") if req.UserID == "" { return nil, errs.ErrArgs.Wrap("user_id is empty") } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 2dfb93bc1..af8ef2f3d 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -31,12 +31,13 @@ const ( appleDeviceToken = "DEVICE_TOKEN" getuiToken = "GETUI_TOKEN" getuiTaskID = "GETUI_TASK_ID" - messageCache = "MESSAGE_CACHE:" - messageReadCache = "MESSAGE_READ_CACHE:" signalCache = "SIGNAL_CACHE:" signalListCache = "SIGNAL_LIST_CACHE:" fcmToken = "FCM_TOKEN:" + messageCache = "MESSAGE_CACHE:" + messageDelUserList = "MESSAGE_DEL_USER_LIST:" + userDelMessagesList = "USER_DEL_MESSAGES_LIST:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" exTypeKeyLocker = "EX_LOCK:" @@ -63,33 +64,40 @@ type SeqCache interface { GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) } -type MsgModel interface { - SeqCache - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error - GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) - SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error - DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error - GetMessagesBySeq(ctx context.Context, conversationID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessageToCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int, error) - DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error - CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error +type thirdCache interface { HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) DelUserSignalList(ctx context.Context, userID string) error - DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error - SetGetuiToken(ctx context.Context, token string, expireTime int64) error - GetGetuiToken(ctx context.Context) (string, error) - SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error - GetGetuiTaskID(ctx context.Context) (string, error) - SetSendMsgStatus(ctx context.Context, id string, status int32) error - GetSendMsgStatus(ctx context.Context, id string) (int32, error) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) DelFcmToken(ctx context.Context, account string, platformID int) error IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetGetuiToken(ctx context.Context, token string, expireTime int64) error + GetGetuiToken(ctx context.Context) (string, error) + SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error + GetGetuiTaskID(ctx context.Context) (string, error) +} + +type MsgModel interface { + SeqCache + thirdCache + AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) + SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error + DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error + GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) + UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error + DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) + DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error + GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) + CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error + DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error @@ -98,9 +106,6 @@ type MsgModel interface { SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - - GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) - DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { @@ -327,10 +332,73 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, return len(failedMsgs), err } -func (c *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { +func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { + return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq)) +} + +func (c *msgCache) getUserDelList(conversationID, userID string) string { + return userDelMessagesList + conversationID + ":" + userID +} + +func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error { pipe := c.rdb.Pipeline() - for _, v := range msgList { - if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { + for _, seq := range seqs { + delUserListKey := c.getMessageDelUserListKey(conversationID, seq) + userDelListKey := c.getUserDelList(conversationID, userID) + err := pipe.SAdd(ctx, delUserListKey, userID).Err() + if err != nil { + return errs.Wrap(err) + } + err = pipe.SAdd(ctx, userDelListKey, seq).Err() + if err != nil { + return errs.Wrap(err) + } + if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return errs.Wrap(err) + } + if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return errs.Wrap(err) + } + } + _, err := pipe.Exec(ctx) + return errs.Wrap(err) +} + +func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) { + result, err := c.rdb.SMembers(ctx, c.getUserDelList(userID, conversationID)).Result() + if err != nil { + return nil, errs.Wrap(err) + } + seqs = make([]int64, len(result)) + for i, v := range result { + seqs[i] = utils.StringToInt64(v) + } + return seqs, nil +} + +func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) { + for _, seq := range seqs { + delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() + if err != nil { + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + continue + } + for _, userID := range delUsers { + if err := c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err(); err != nil { + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID) + } + + if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + } + } + } +} + +func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { + pipe := c.rdb.Pipeline() + for _, seq := range seqs { + if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { return errs.Wrap(err) } } @@ -572,41 +640,3 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } - -func (c *msgCache) NewCache() MsgModel { - return &msgCache{ - metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), - expireTime: c.expireTime, - rcClient: c.rcClient, - } -} - -func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string { - return messageReadCache + docID + "_" + strconv.Itoa(int(seq)) -} - -func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (int, error) { - key := c.getMsgReadCacheKey(utils.GetConversationIDByMsgModel(msg.Msg), msg.Msg.Seq) - for i, _key := range keys { - if key == _key { - return i, nil - } - } - return 0, errIndex -} - -func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { - var keys []string - for _, seq := range seqs { - keys = append(keys, c.getMsgReadCacheKey(docID, seq)) - } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { - return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) - }) -} - -func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel { - cache := c.NewCache() - c.AddKeys(c.getMsgReadCacheKey(docID, seq)) - return cache -} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 9a59e9030..5f4ad098c 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -37,7 +37,8 @@ type CommonMsgDatabase interface { // mark as read MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error // 刪除redis中消息缓存 - DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error + DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error + DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) // incrSeq然后批量插入缓存 BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) @@ -316,8 +317,12 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI return nil } -func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { - return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs) +func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { + return db.cache.DeleteMessages(ctx, conversationID, seqs) +} + +func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) { + db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs) } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { @@ -474,13 +479,29 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin break } } - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) if err != nil { if err != redis.Nil { prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) log.ZError(ctx, "get message from redis exception", err, conversationID, seqs) } } + var successMsgs []*sdkws.MsgData + if len(cachedMsgs) > 0 { + delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID) + if err != nil && errs.Unwrap(err) != redis.Nil { + return 0, 0, nil, err + } else { + log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID) + } + + for _, msg := range cachedMsgs { + if !utils.Contain(msg.Seq, delSeqs...) { + successMsgs = append(successMsgs, msg) + } + } + } + log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs) if len(failedSeqs) != 0 { log.ZDebug(ctx, "msgs not exist in redis", err, "seqs", seqs) } @@ -635,6 +656,9 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio } func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { + if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil { + return err + } for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { var indexes []int for _, seq := range seqs { @@ -648,6 +672,19 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { + msgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + if err != nil { + log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs) + return err + } + var cacheSeqs []int64 + for _, msg := range msgs { + cacheSeqs = append(cacheSeqs, msg.Seq) + } + if err := db.cache.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil { + return err + } + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for _, seq := range seqs { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { From 47541c1d87702acca8f212f253cd68c08b871578 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 15:19:08 +0800 Subject: [PATCH 3/7] zk --- pkg/discoveryregistry/zookeeper/zk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/discoveryregistry/zookeeper/zk.go b/pkg/discoveryregistry/zookeeper/zk.go index 2fd9e14c7..e3232c706 100644 --- a/pkg/discoveryregistry/zookeeper/zk.go +++ b/pkg/discoveryregistry/zookeeper/zk.go @@ -103,7 +103,7 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien client.CloseZK() return nil, err } - resolver.Register(client) + // resolver.Register(client) var wg sync.WaitGroup wg.Add(2) go client.refresh(&wg) From e8dd795f4c4a10ab6cd624083fd744ac6b89108e Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 15:55:25 +0800 Subject: [PATCH 4/7] groupMemberNotification log --- internal/rpc/group/group.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index d39e00e1e..3e9ff3b64 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1386,7 +1386,10 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr } } if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil { - s.Notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID) + log.ZDebug(ctx, "setGroupMemberInfo notification", "member", member.UserID) + if err := s.Notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID); err != nil { + log.ZError(ctx, "setGroupMemberInfo notification failed", err, "member", member.UserID, "groupID", member.GroupID) + } } } return resp, nil From 85e82b95ba366239643f0c92c28bdb0c75533bf9 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 16:03:01 +0800 Subject: [PATCH 5/7] add log --- pkg/rpcclient/notification/group.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index f60058bfa..facaa6275 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -86,10 +86,12 @@ func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID s if err != nil { return nil, err } + log.ZDebug(ctx, "getGroupMembers", "members", members) users, err := g.getUsersInfoMap(ctx, userIDs) if err != nil { return nil, err } + log.ZDebug(ctx, "getUsersInfoMap", "users", users) res := make([]*sdkws.GroupMemberFullInfo, 0, len(members)) for _, member := range members { user, ok := users[member.UserID] From cd0acbba871cb2f3966dc53b51f90ab37fac64b4 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 16:13:11 +0800 Subject: [PATCH 6/7] fix notification bug --- pkg/rpcclient/notification/group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index facaa6275..4f80c4a0b 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -506,7 +506,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con if err != nil { return err } - user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) + user, err := g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID}) if err != nil { return err } From f230c062447ec4b7107c08f3bf8c4e009b916f95 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 17:00:46 +0800 Subject: [PATCH 7/7] ctx --- internal/push/push_to_client.go | 2 +- internal/rpc/user/user.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 9c38c4069..31cafb0f0 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -202,7 +202,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if msg.ContentType != constant.SignalingNotification { notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) if err != nil { - log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID) + // log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID) return err } needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 5db39a8d1..7f13c3aab 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -98,11 +98,9 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI if err != nil { return nil, err } - go func() { - for _, v := range friends { - s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, mcontext.GetOpUserID(ctx)) - } - }() + for _, v := range friends { + s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, mcontext.GetOpUserID(ctx)) + } s.notificationSender.UserInfoUpdatedNotification(ctx, mcontext.GetOpUserID(ctx), req.UserInfo.UserID) return resp, nil }