Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-06-08 17:37:08 +08:00
commit b536f489a4
11 changed files with 168 additions and 90 deletions

View File

@ -44,10 +44,15 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
if err != nil { if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
} }
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData) var seqs []int64
for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq)
}
err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil { if err != nil {
log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
} }
mc.msgDatabase.DelUserDeleteMsgsList(ctx, msgFromMQ.ConversationID, seqs)
} }
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

View File

@ -202,7 +202,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if msg.ContentType != constant.SignalingNotification { if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil { if err != nil {
log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID) // log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID)
return err return err
} }
needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs) needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs)

View File

@ -624,15 +624,17 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
} }
func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) { func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) {
resp := &pbGroup.GetGroupApplicationListResp{} pageNumber, showNumber := utils.GetPage(req.Pagination)
groupIDs, err := s.GroupDatabase.FindUserManagedGroupID(ctx, req.FromUserID) groupIDs, err := s.GroupDatabase.FindUserManagedGroupID(ctx, req.FromUserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp := &pbGroup.GetGroupApplicationListResp{}
if len(groupIDs) == 0 { if len(groupIDs) == 0 {
return resp, nil return resp, nil
} }
total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, req.Pagination.PageNumber, req.Pagination.ShowNumber) total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, pageNumber, showNumber)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1384,7 +1386,10 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
} }
} }
if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil { if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil {
s.Notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID) log.ZDebug(ctx, "setGroupMemberInfo notification", "member", member.UserID)
if err := s.Notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID); err != nil {
log.ZError(ctx, "setGroupMemberInfo notification failed", err, "member", member.UserID, "groupID", member.GroupID)
}
} }
} }
return resp, nil return resp, nil

View File

@ -16,7 +16,7 @@ import (
) )
func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) {
defer log.ZInfo(ctx, "RevokeMsg return line") defer log.ZDebug(ctx, "RevokeMsg return line")
if req.UserID == "" { if req.UserID == "" {
return nil, errs.ErrArgs.Wrap("user_id is empty") return nil, errs.ErrArgs.Wrap("user_id is empty")
} }

View File

@ -98,11 +98,9 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil { if err != nil {
return nil, err return nil, err
} }
go func() { for _, v := range friends {
for _, v := range friends { s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, mcontext.GetOpUserID(ctx))
s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, mcontext.GetOpUserID(ctx)) }
}
}()
s.notificationSender.UserInfoUpdatedNotification(ctx, mcontext.GetOpUserID(ctx), req.UserInfo.UserID) s.notificationSender.UserInfoUpdatedNotification(ctx, mcontext.GetOpUserID(ctx), req.UserInfo.UserID)
return resp, nil return resp, nil
} }

View File

@ -26,7 +26,7 @@ func NewRootCmd(name string) (rootCmd *RootCmd) {
if err := rootCmd.getConfFromCmdAndInit(cmd); err != nil { if err := rootCmd.getConfFromCmdAndInit(cmd); err != nil {
panic(err) panic(err)
} }
if err := log.InitFromConfig(name, config.Config.Log.RemainLogLevel, config.Config.Log.IsStdout, config.Config.Log.IsJson, config.Config.Log.StorageLocation, config.Config.Log.RemainRotationCount); err != nil { if err := log.InitFromConfig("OpenIM.log.all", name, config.Config.Log.RemainLogLevel, config.Config.Log.IsStdout, config.Config.Log.IsJson, config.Config.Log.StorageLocation, config.Config.Log.RemainRotationCount); err != nil {
panic(err) panic(err)
} }
return nil return nil

View File

@ -31,12 +31,13 @@ const (
appleDeviceToken = "DEVICE_TOKEN" appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN" getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID" getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
messageReadCache = "MESSAGE_READ_CACHE:"
signalCache = "SIGNAL_CACHE:" signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:" signalListCache = "SIGNAL_LIST_CACHE:"
fcmToken = "FCM_TOKEN:" fcmToken = "FCM_TOKEN:"
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:" exTypeKeyLocker = "EX_LOCK:"
@ -63,33 +64,40 @@ type SeqCache interface {
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
} }
type MsgModel interface { type thirdCache interface {
SeqCache
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error
GetMessagesBySeq(ctx context.Context, conversationID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int, error)
DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error
CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error
HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error) GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
}
type MsgModel interface {
SeqCache
thirdCache
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error
GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error)
CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
@ -98,9 +106,6 @@ type MsgModel interface {
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error)
DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
} }
func NewMsgCacheModel(client redis.UniversalClient) MsgModel { func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
@ -327,10 +332,73 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string,
return len(failedMsgs), err return len(failedMsgs), err
} }
func (c *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
}
func (c *msgCache) getUserDelList(conversationID, userID string) string {
return userDelMessagesList + conversationID + ":" + userID
}
func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error {
pipe := c.rdb.Pipeline() pipe := c.rdb.Pipeline()
for _, v := range msgList { for _, seq := range seqs {
if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
userDelListKey := c.getUserDelList(conversationID, userID)
err := pipe.SAdd(ctx, delUserListKey, userID).Err()
if err != nil {
return errs.Wrap(err)
}
err = pipe.SAdd(ctx, userDelListKey, seq).Err()
if err != nil {
return errs.Wrap(err)
}
if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
}
_, err := pipe.Exec(ctx)
return errs.Wrap(err)
}
func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) {
result, err := c.rdb.SMembers(ctx, c.getUserDelList(userID, conversationID)).Result()
if err != nil {
return nil, errs.Wrap(err)
}
seqs = make([]int64, len(result))
for i, v := range result {
seqs[i] = utils.StringToInt64(v)
}
return seqs, nil
}
func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
for _, seq := range seqs {
delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
if err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
continue
}
for _, userID := range delUsers {
if err := c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID)
}
if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
}
}
}
}
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
pipe := c.rdb.Pipeline()
for _, seq := range seqs {
if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
@ -572,41 +640,3 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
} }
func (c *msgCache) NewCache() MsgModel {
return &msgCache{
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
expireTime: c.expireTime,
rcClient: c.rcClient,
}
}
func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string {
return messageReadCache + docID + "_" + strconv.Itoa(int(seq))
}
func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (int, error) {
key := c.getMsgReadCacheKey(utils.GetConversationIDByMsgModel(msg.Msg), msg.Msg.Seq)
for i, _key := range keys {
if key == _key {
return i, nil
}
}
return 0, errIndex
}
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMsgReadCacheKey(docID, seq))
}
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
})
}
func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel {
cache := c.NewCache()
c.AddKeys(c.getMsgReadCacheKey(docID, seq))
return cache
}

View File

@ -37,7 +37,8 @@ type CommonMsgDatabase interface {
// mark as read // mark as read
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
// 刪除redis中消息缓存 // 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
// incrSeq然后批量插入缓存 // incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
@ -316,8 +317,12 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
return nil return nil
} }
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs) return db.cache.DeleteMessages(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
@ -474,13 +479,29 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
break break
} }
} }
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, conversationID, seqs) log.ZError(ctx, "get message from redis exception", err, conversationID, seqs)
} }
} }
var successMsgs []*sdkws.MsgData
if len(cachedMsgs) > 0 {
delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
} else {
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID)
}
for _, msg := range cachedMsgs {
if !utils.Contain(msg.Seq, delSeqs...) {
successMsgs = append(successMsgs, msg)
}
}
}
log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs)
if len(failedSeqs) != 0 { if len(failedSeqs) != 0 {
log.ZDebug(ctx, "msgs not exist in redis", err, "seqs", seqs) log.ZDebug(ctx, "msgs not exist in redis", err, "seqs", seqs)
} }
@ -635,6 +656,9 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
} }
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil {
return err
}
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
var indexes []int var indexes []int
for _, seq := range seqs { for _, seq := range seqs {
@ -648,6 +672,19 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
} }
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
msgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs)
return err
}
var cacheSeqs []int64
for _, msg := range msgs {
cacheSeqs = append(cacheSeqs, msg.Seq)
}
if err := db.cache.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil {
return err
}
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
for _, seq := range seqs { for _, seq := range seqs {
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {

View File

@ -32,14 +32,14 @@ var (
) )
// InitFromConfig initializes a Zap-based logger // InitFromConfig initializes a Zap-based logger
func InitFromConfig(name string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) error { func InitFromConfig(loggerPrefixName, moduleName string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) error {
l, err := NewZapLogger(name, logLevel, isStdout, isJson, logLocation, rotateCount) l, err := NewZapLogger(loggerPrefixName, moduleName, logLevel, isStdout, isJson, logLocation, rotateCount)
if err != nil { if err != nil {
return err return err
} }
pkgLogger = l.WithCallDepth(2) pkgLogger = l.WithCallDepth(2)
if isJson { if isJson {
pkgLogger = pkgLogger.WithName(name) pkgLogger = pkgLogger.WithName(moduleName)
} }
return nil return nil
} }
@ -61,12 +61,13 @@ func ZError(ctx context.Context, msg string, err error, keysAndValues ...interfa
} }
type ZapLogger struct { type ZapLogger struct {
zap *zap.SugaredLogger zap *zap.SugaredLogger
level zapcore.Level level zapcore.Level
loggerName string loggerName string
loggerPrefixName string
} }
func NewZapLogger(loggerName string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) (*ZapLogger, error) { func NewZapLogger(loggerPrefixName, loggerName string, logLevel int, isStdout bool, isJson bool, logLocation string, rotateCount uint) (*ZapLogger, error) {
zapConfig := zap.Config{ zapConfig := zap.Config{
Level: zap.NewAtomicLevelAt(logLevelMap[logLevel]), Level: zap.NewAtomicLevelAt(logLevelMap[logLevel]),
// EncoderConfig: zap.NewProductionEncoderConfig(), // EncoderConfig: zap.NewProductionEncoderConfig(),
@ -81,7 +82,7 @@ func NewZapLogger(loggerName string, logLevel int, isStdout bool, isJson bool, l
// if isStdout { // if isStdout {
// zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stdout", "stderr") // zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stdout", "stderr")
// } // }
zl := &ZapLogger{level: logLevelMap[logLevel], loggerName: loggerName} zl := &ZapLogger{level: logLevelMap[logLevel], loggerName: loggerName, loggerPrefixName: loggerPrefixName}
opts, err := zl.cores(isStdout, isJson, logLocation, rotateCount) opts, err := zl.cores(isStdout, isJson, logLocation, rotateCount)
if err != nil { if err != nil {
return nil, err return nil, err
@ -158,7 +159,7 @@ func (l *ZapLogger) timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder)
} }
func (l *ZapLogger) getWriter(logLocation string, rorateCount uint) (zapcore.WriteSyncer, error) { func (l *ZapLogger) getWriter(logLocation string, rorateCount uint) (zapcore.WriteSyncer, error) {
logf, err := rotatelogs.New(logLocation+sp+"OpenIM.log.all"+".%Y-%m-%d", logf, err := rotatelogs.New(logLocation+sp+l.loggerPrefixName+".%Y-%m-%d",
rotatelogs.WithRotationCount(rorateCount), rotatelogs.WithRotationCount(rorateCount),
rotatelogs.WithRotationTime(time.Duration(config.Config.Log.RotationTime)*time.Hour), rotatelogs.WithRotationTime(time.Duration(config.Config.Log.RotationTime)*time.Hour),
) )

View File

@ -103,7 +103,7 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien
client.CloseZK() client.CloseZK()
return nil, err return nil, err
} }
resolver.Register(client) // resolver.Register(client)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
go client.refresh(&wg) go client.refresh(&wg)

View File

@ -86,10 +86,12 @@ func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID s
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.ZDebug(ctx, "getGroupMembers", "members", members)
users, err := g.getUsersInfoMap(ctx, userIDs) users, err := g.getUsersInfoMap(ctx, userIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.ZDebug(ctx, "getUsersInfoMap", "users", users)
res := make([]*sdkws.GroupMemberFullInfo, 0, len(members)) res := make([]*sdkws.GroupMemberFullInfo, 0, len(members))
for _, member := range members { for _, member := range members {
user, ok := users[member.UserID] user, ok := users[member.UserID]
@ -504,7 +506,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
if err != nil { if err != nil {
return err return err
} }
user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) user, err := g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID})
if err != nil { if err != nil {
return err return err
} }