fengyun.rui a04c9d99ae refactor: client in msggateway (#1343)
* refactor: clietn in msggateway

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

* perf: add sync.pool for req object

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

---------

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

feat: use dummy pusher by default (#1349)

Add Prometheus monitoring function (#1337)

* Code adaptation k8s: service discovery and registration adaptation, configuration adaptation

* Initial submission of the help charts script for openim API

* change the help charts script

* change the help charts script

* change helm chart codes

* change dockerfiles script

* change chart script:add configmap mounts

* change chart script:change repository

* change chart script:msggateway add one service

* change config.yaml

* roll back some config values

* change chart script:change Ingress rule with a rewrite annotation

* add mysql charts scrible

* change chart script:add mysql.config.yaml

* add nfs provisioner charts

* change chart script:add nfs.config.yaml

* add ingress-nginx charts

* change chart script:add ingress-nginx.config.yaml

* add redis &mongodb charts

* add kafka&minio charts

* change chart script:change redis.values.yaml

* change chart script:add redis.config.yaml

* change chart script:change redis.config.yaml

* change chart script:change mongodb.value.yaml

* change chart script:change mongodb.value.yaml

* change chart script:add mongodb.config.yaml

* change chart script:change minio.values.yaml

* change chart script:add minio.config.yaml

* change chart script:change kafka.values.yaml

* change chart script:add kafka.config.yaml

* change chart script:change services.config.yaml

* bug fix:Delete websocket's Port restrictions

* bug fix:change port value

* change chart script:Submit a stable version script

* fix bug:Implement option interface

* fix bug:change K8sDR.Register

* change config.yaml

* change chats script:minio service add ingress

* change chats script:minio service add ingress

* change chats script:kafka.replicaCount=3& change minio.api ingress

* delete change chats script

* change config.yaml

* change openim.yaml

* merge go.sum

* Add monitoring function and struct for Prometheus on gin and GRPC

* Add GRPC and gin server monitoring logic

* Add GRPC and gin server monitoring logic2

* Add GRPC and gin server monitoring logic3

* Add GRPC and gin server monitoring logic4

* Add GRPC and gin server monitoring logic5

* Add GRPC and gin server monitoring logic6

* Add GRPC and gin server monitoring logic7

* delete:old monitoring code

* add for test

* fix bug:change packname

* fix bug:delete getPromPort funciton

* fix bug:delete getPromPort funciton

* fix bug:change logs

* fix bug:change registerName logic in GetGrpcCusMetrics function

* add getPrometheus url api

* fix:config path logic

* fix:prometheus enable function

* fix:prometheus enable function

* fix:transfer Multi process monitoring logic

* del:del not using manifest

* fix:openim-msgtransfer.sh

* fix:openim-msgtransfer.sh

---------

Co-authored-by: lin.huang <lin.huang@apulis.com>
Co-authored-by: Xinwei Xiong <3293172751@qq.com>

fix: initiateUpload sign list number (#1358)

* optimize scheduled deletion

* optimize scheduled deletion

* optimize scheduled deletion

* optimize scheduled deletion

* minio cache

* fix: conflicts

* feat: minio cache

* feat: cache optimize

* feat: cache optimize

* feat: cache optimize

* feat: cache optimize

* feat: cache optimize

* fix: initiateUpload sign list number

fix: msg pull change and fcm redis flag fix. (#1367)

* fix: to start im or chat, ZooKeeper must be started first.

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: go mod update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: get all userID

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: msggateway add online status call

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* chore: network mode change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* feat: add api of get server time

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: remove go work sum

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: pull message add isRead field

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: check msg-transfer script

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: start don't kill old process

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: check component

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: pull message set isRead only message come from single.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add ex field to update group info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: message log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* fxi: component check output valid info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fxi: component check output valid info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: send message test log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* test: remove info log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: api of send message add sendTime field.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add callback for update user's info.

* cicd: robot automated Change

* fix: change callback command name.

* cicd: robot automated Change

* fix: single chat unread status change.

* fix: single chat unread status change.

* fix: single chat unread status change.

* fix: user status change.

* cicd: robot automated Change

* fix: user status change.

* fix: user status change.

* fix: user status change.

* cicd: robot automated Change

* fix: ws close when user logout.

* fix: remove repeat platform on online status.

* cicd: robot automated Change

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* re: remove router of unsubscribeStatus.

* re: remove router of unsubscribeStatus.

* re: remove router of unsubscribeStatus.

* re: remove router of unsubscribeStatus.

* fix: reset branch

* fix: not support redis cluster. CROSSSLOT Keys in request don't hash to the same slot

* fix: update user.FaceURL do not trigger GroupMemberInfoSetNotification

* cicd: robot automated Change

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: zk add close to avoid zk block.

* fix: go mod update.

* fix: msg pull change and fcm redis flag fix.

---------

Signed-off-by: Gordon <1432970085@qq.com>
Signed-off-by: withchao <993506633@qq.com>
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: withchao <993506633@qq.com>
Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com>
Co-authored-by: FGadvancer <FGadvancer@users.noreply.github.com>
Co-authored-by: withchao <withchao@users.noreply.github.com>

fix: sync close ws conn when kick old user avoid wrong trigger order about  online status. (#1368)

Update README-zh_CN.md

Update README-zh_CN.md

fix: GetUserReqApplicationList error when there is a disbanded group chat (#1374)

fix: error when querying some information about disbanded group (#1376)

* fix: GetUserReqApplicationList error when there is a disbanded group chat

* fix: error when querying some information about disbanded group

fix: GetUserReqApplicationList dismissed group error (#1378)

* fix: GetUserReqApplicationList error when there is a disbanded group chat

* fix: error when querying some information about disbanded group

* fix: GetUserReqApplicationList dismissed group error

refactor: lower the level of code nesting (#1370)

* refactor: lower the level of code nesting

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

* refactor: lower the level of code nesting

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

---------

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

☀️ feat: Enhancing OpenIM with Integrated E2E Testing and CI/CD Enhancements (#1359)

* cicd: robot automated Change

* feat: add api test

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: add api test make file

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: add openim e2e test

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: add openim e2e test

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* fix: Fixed some unused scripts and some names

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* docs: optimize openim docs

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: add prom address

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: add openim info test

* feat: add openim images config path

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* fix: fix tim file rename

* fix: fix tim file rename

* fix: fix tim file rename

* fix: fix tim file rename

* fix: add openim test e2e

* feat: add openim test .keep

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: add openim test .keep

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: openim test

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: openim test

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

* feat: openim test

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
Co-authored-by: cubxxw <cubxxw@users.noreply.github.com>

perf: auto set runtime maxprocs in docker (#1339)

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

build: build openim image (#1381)

perf: improve gzip performance with sync.pool (#1321)

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
Co-authored-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

fix: add kafka compress type and producer ack params (#1310)

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

delete not using files

cicd: robot automated Change

add prometheus docker-compose for monitor

fix prometheus.yaml

fix environment.sh

fix init-config.sh

fix init-config.sh

fix env_template.yaml

fix docker-compose.yml

fix docker-compose.yml

add openim_admin_front service

change openim-admin-front

del not using files

add node-exporter-dashaboard.yaml
2023-11-13 21:31:48 +08:00

747 lines
27 KiB
Go

// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"errors"
"strconv"
"time"
"github.com/dtm-labs/rockscache"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/OpenIMSDK/tools/errs"
"github.com/gogo/protobuf/jsonpb"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/redis/go-redis/v9"
)
const (
maxSeq = "MAX_SEQ:"
minSeq = "MIN_SEQ:"
conversationUserMinSeq = "CON_USER_MIN_SEQ:"
hasReadSeq = "HAS_READ_SEQ:"
appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FCM_TOKEN = "FCM_TOKEN:"
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
)
type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
// seqs map: key userID value minSeq
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// k: user, v: seq
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
// k: conversation, v :seq
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
}
type thirdCache interface {
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
}
type MsgModel interface {
SeqCache
thirdCache
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error
GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error)
CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
}
func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
return &msgCache{rdb: client}
}
type msgCache struct {
metaCache
rdb redis.UniversalClient
expireTime time.Duration
rcClient *rockscache.Client
msgDocDatabase unrelationtb.MsgDocModelInterface
}
func (c *msgCache) getMaxSeqKey(conversationID string) string {
return maxSeq + conversationID
}
func (c *msgCache) getMinSeqKey(conversationID string) string {
return minSeq + conversationID
}
func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string {
return hasReadSeq + userID + ":" + conversationID
}
func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64())
}
func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
val := utils.StringToInt64(res)
if val != 0 {
m[items[i]] = val
}
}
return m, nil
//pipe := c.rdb.Pipeline()
//for _, v := range items {
// if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
// return nil, errs.Wrap(err)
// }
//}
//result, err := pipe.Exec(ctx)
//if err != nil && err != redis.Nil {
// return nil, errs.Wrap(err)
//}
//m = make(map[string]int64, len(items))
//for i, v := range result {
// seq := v.(*redis.StringCmd)
// if seq.Err() != nil && seq.Err() != redis.Nil {
// return nil, errs.Wrap(v.Err())
// }
// val := utils.StringToInt64(seq.Val())
// if val != 0 {
// m[items[i]] = val
// }
//}
//return m, nil
}
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
}
func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
//pipe := c.rdb.Pipeline()
//for k, seq := range seqs {
// err := pipe.Set(ctx, getkey(k), seq, 0).Err()
// if err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return err
}
func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *msgCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID
}
func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64())
}
func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64())
}
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
}
func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
m, err := c.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, errs.Wrap(err)
}
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, nil
}
func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform)
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err())
}
func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform)
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
}
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
}
func (c *msgCache) allMessageCacheKey(conversationID string) string {
return messageCache + conversationID + "_*"
}
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
for _, seq := range seqs {
res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
if err != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq)
continue
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res, &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq)
continue
}
if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, &msg)
}
return
//pipe := c.rdb.Pipeline()
//for _, v := range seqs {
// // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
// key := c.getMessageCacheKey(conversationID, v)
// if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
// return nil, nil, err
// }
//}
//result, err := pipe.Exec(ctx)
//for i, v := range result {
// cmd := v.(*redis.StringCmd)
// if cmd.Err() != nil {
// failedSeqs = append(failedSeqs, seqs[i])
// } else {
// msg := sdkws.MsgData{}
// err = msgprocessor.String2Pb(cmd.Val(), &msg)
// if err == nil {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, &msg)
// continue
// }
// } else {
// log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val())
// }
// failedSeqs = append(failedSeqs, seqs[i])
// }
//}
//return seqMsgs, failedSeqs, err
}
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
for _, msg := range msgs {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return 0, errs.Wrap(err)
}
key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return 0, errs.Wrap(err)
}
}
return len(msgs), nil
//pipe := c.rdb.Pipeline()
//var failedMsgs []*sdkws.MsgData
//for _, msg := range msgs {
// key := c.getMessageCacheKey(conversationID, msg.Seq)
// s, err := msgprocessor.Pb2String(msg)
// if err != nil {
// return 0, errs.Wrap(err)
// }
// err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
// if err != nil {
// failedMsgs = append(failedMsgs, msg)
// log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs)
// }
//}
//_, err := pipe.Exec(ctx)
//return len(failedMsgs), err
}
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
}
func (c *msgCache) getUserDelList(conversationID, userID string) string {
return userDelMessagesList + conversationID + ":" + userID
}
func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error {
for _, seq := range seqs {
delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
userDelListKey := c.getUserDelList(conversationID, userID)
err := c.rdb.SAdd(ctx, delUserListKey, userID).Err()
if err != nil {
return errs.Wrap(err)
}
err = c.rdb.SAdd(ctx, userDelListKey, seq).Err()
if err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
//pipe := c.rdb.Pipeline()
//for _, seq := range seqs {
// delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
// userDelListKey := c.getUserDelList(conversationID, userID)
// err := pipe.SAdd(ctx, delUserListKey, userID).Err()
// if err != nil {
// return errs.Wrap(err)
// }
// err = pipe.SAdd(ctx, userDelListKey, seq).Err()
// if err != nil {
// return errs.Wrap(err)
// }
// if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
// return errs.Wrap(err)
// }
// if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return errs.Wrap(err)
}
func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) {
result, err := c.rdb.SMembers(ctx, c.getUserDelList(conversationID, userID)).Result()
if err != nil {
return nil, errs.Wrap(err)
}
seqs = make([]int64, len(result))
for i, v := range result {
seqs[i] = utils.StringToInt64(v)
}
return seqs, nil
}
func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
for _, seq := range seqs {
delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
if err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
continue
}
if len(delUsers) > 0 {
var failedFlag bool
for _, userID := range delUsers {
err = c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
if err != nil {
failedFlag = true
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID)
}
}
if !failedFlag {
if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
}
}
}
}
//for _, seq := range seqs {
// delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
// if err != nil {
// log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
// continue
// }
// if len(delUsers) > 0 {
// pipe := c.rdb.Pipeline()
// var failedFlag bool
// for _, userID := range delUsers {
// err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
// if err != nil {
// failedFlag = true
// log.ZWarn(
// ctx,
// "DelUserDeleteMsgsList failed",
// err,
// "conversationID",
// conversationID,
// "seq",
// seq,
// "userID",
// userID,
// )
// }
// }
// if !failedFlag {
// if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
// log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
// }
// }
// if _, err := pipe.Exec(ctx); err != nil {
// log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq)
// }
// }
//}
}
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
for _, seq := range seqs {
if err := c.rdb.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
//pipe := c.rdb.Pipeline()
//for _, seq := range seqs {
// if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return errs.Wrap(err)
}
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result()
if errors.Is(err, redis.Nil) {
return nil
}
if err != nil {
return errs.Wrap(err)
}
for _, v := range vals {
if err := c.rdb.Del(ctx, v).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
//pipe := c.rdb.Pipeline()
//for _, v := range vals {
// if err := pipe.Del(ctx, v).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err = pipe.Exec(ctx)
//return errs.Wrap(err)
}
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
for _, seq := range seqs {
key := c.getMessageCacheKey(userID, seq)
result, err := c.rdb.Get(ctx, key).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
}
return errs.Wrap(err)
}
var msg sdkws.MsgData
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
return err
}
msg.Status = constant.MsgDeleted
s, err := msgprocessor.Pb2String(&msg)
if err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result())
}
func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result())
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
}
func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
return int32(result), errs.Wrap(err)
}
func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result())
}
func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return errs.Wrap(c.rdb.Del(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Err())
}
func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), errs.Wrap(err)
}
func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
}
func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
}
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
}
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.Del(ctx, key).Err())
}
func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
case constant.GroupChatType:
return "EX_GROUP_" + clientMsgID
case constant.SuperGroupChatType:
return "EX_SUPER_GROUP_" + clientMsgID
case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
}
return n > 0, nil
}
func (c *msgCache) SetMessageTypeKeyValue(
ctx context.Context,
clientMsgID string,
sessionType int32,
typeKey, value string,
) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
}
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
}
func (c *msgCache) GetOneMessageAllReactionList(
ctx context.Context,
clientMsgID string,
sessionType int32,
) (map[string]string, error) {
return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
}
func (c *msgCache) DeleteOneMessageKey(
ctx context.Context,
clientMsgID string,
sessionType int32,
subKey string,
) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}