Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-05-18 10:32:18 +08:00
commit 87fb6d95ee
5 changed files with 32 additions and 19 deletions

View File

@ -120,19 +120,27 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []
for _, v := range totalMsgs { for _, v := range totalMsgs {
options := utils.Options(v.message.Options) options := utils.Options(v.message.Options)
if !options.IsNotNotification() { if !options.IsNotNotification() {
// 原通知 // clone msg from notificationMsg
notificationMsg := proto.Clone(v.message).(*sdkws.MsgData)
if options.IsSendMsg() { if options.IsSendMsg() {
msg := proto.Clone(v.message).(*sdkws.MsgData)
// 消息 // 消息
if v.message.Options != nil { if v.message.Options != nil {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotNotification(true), utils.WithSendMsg(false)) msg.Options = utils.NewMsgOptions()
} }
storageMsgList = append(storageMsgList, v.message) if options.IsOfflinePush() {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithOfflinePush(false))
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithOfflinePush(true))
}
if options.IsUnreadCount() {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithUnreadCount(false))
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithUnreadCount(true))
}
storageMsgList = append(storageMsgList, msg)
} }
if isStorage(notificationMsg) { if isStorage(v.message) {
storageNotificatoinList = append(storageNotificatoinList, notificationMsg) storageNotificatoinList = append(storageNotificatoinList, v.message)
} else { } else {
notStorageNotificationList = append(notStorageNotificationList, notificationMsg) notStorageNotificationList = append(notStorageNotificationList, v.message)
} }
} else { } else {
if isStorage(v.message) { if isStorage(v.message) {

View File

@ -65,8 +65,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
var isSend bool = true var isSend bool = true
conversationID := utils.GetConversationIDByMsg(req.MsgData) conversationID := utils.GetConversationIDByMsg(req.MsgData)
if !utils.IsNotification(conversationID) {
if utils.MsgIsNotification(req.MsgData) {
isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req) isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -358,10 +358,10 @@ type Notification struct {
func GetOptionsByNotification(cfg NotificationConf) utils.Options { func GetOptionsByNotification(cfg NotificationConf) utils.Options {
opts := utils.NewOptions() opts := utils.NewOptions()
if cfg.UnreadCount { if cfg.UnreadCount {
opts = utils.WithOptions(opts, utils.WithUnreadCount()) opts = utils.WithOptions(opts, utils.WithUnreadCount(true))
} }
if cfg.OfflinePush.Enable { if cfg.OfflinePush.Enable {
opts = utils.WithOptions(opts, utils.WithOfflinePush()) opts = utils.WithOptions(opts, utils.WithOfflinePush(true))
} }
switch cfg.ReliabilityLevel { switch cfg.ReliabilityLevel {
case constant.UnreliableNotification: case constant.UnreliableNotification:

View File

@ -16,7 +16,7 @@ import (
const ( const (
scanCount = 3000 scanCount = 3000
maxRetryTimes = 5 maxRetryTimes = 5
retryInterval = time.Second * 1 retryInterval = time.Millisecond * 100
) )
var errIndex = errors.New("err index") var errIndex = errors.New("err index")
@ -49,7 +49,7 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil { if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil {
if retryTimes >= m.maxRetryTimes { if retryTimes >= m.maxRetryTimes {
err = errs.ErrInternalServer.Wrap(fmt.Sprintf("delete cache error: %v, keys: %v, retry times %d, please check redis server", err, m.keys, retryTimes)) err = errs.ErrInternalServer.Wrap(fmt.Sprintf("delete cache error: %v, keys: %v, retry times %d, please check redis server", err, m.keys, retryTimes))
log.ZWarn(ctx, "delete cache failed", err, "keys", m.keys) log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", m.keys)
return err return err
} }
retryTimes++ retryTimes++
@ -106,11 +106,11 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil return t, nil
} }
if v == "" { if v == "" {
return t, errs.ErrRecordNotFound.Wrap("msgCache is not found") return t, errs.ErrRecordNotFound.Wrap("cache is not found")
} }
err = json.Unmarshal([]byte(v), &t) err = json.Unmarshal([]byte(v), &t)
if err != nil { if err != nil {
log.ZError(ctx, "msgCache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire)
return t, utils.Wrap(err, "") return t, utils.Wrap(err, "")
} }
return t, nil return t, nil

View File

@ -25,6 +25,12 @@ func NewOptions(opts ...OptionsOpt) Options {
return options return options
} }
func NewMsgOptions() Options {
options := make(map[string]bool, 11)
options[constant.IsOfflinePush] = false
return make(map[string]bool)
}
func WithOptions(options Options, opts ...OptionsOpt) Options { func WithOptions(options Options, opts ...OptionsOpt) Options {
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(options)
@ -56,15 +62,15 @@ func WithPersistent() OptionsOpt {
} }
} }
func WithOfflinePush() OptionsOpt { func WithOfflinePush(b bool) OptionsOpt {
return func(options Options) { return func(options Options) {
options[constant.IsOfflinePush] = true options[constant.IsOfflinePush] = b
} }
} }
func WithUnreadCount() OptionsOpt { func WithUnreadCount(b bool) OptionsOpt {
return func(options Options) { return func(options Options) {
options[constant.IsUnreadCount] = true options[constant.IsUnreadCount] = b
} }
} }