Merge branch 'openimsdk:main' into main

This commit is contained in:
chao 2024-12-27 14:13:28 +08:00 committed by GitHub
commit 9f1fff6127
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 623 additions and 1241 deletions

View File

@ -1,3 +1,188 @@
# OpenIM Application Containerization Deployment Guide
# Kubernetes Deployment
view deploy [README](./deploy/README.md)
## Resource Requests
- CPU: 2 cores
- Memory: 4 GiB
- Disk usage: 20 GiB (on Node)
## Preconditions
ensure that you have already deployed the following components:
- Redis
- MongoDB
- Kafka
- MinIO
## Origin Deploy
### Enter the target dir
`cd ./deployments/deploy/`
### Deploy configs and dependencies
Upate your configMap `openim-config.yml`. **You can check the official docs for more details.**
In `openim-config.yml`, you need modify the following configurations:
**discovery.yml**
- `kubernetes.namespace`: default is `default`, you can change it to your namespace.
**mongodb.yml**
- `address`: set to your already mongodb address or mongo Service name and port in your deployed.
- `database`: set to your mongodb database name.(Need have a created database.)
- `authSource`: set to your mongodb authSource. (authSource is specify the database name associated with the user's credentials, user need create in this database.)
**kafka.yml**
- `address`: set to your already kafka address or kafka Service name and port in your deployed.
**redis.yml**
- `address`: set to your already redis address or redis Service name and port in your deployed.
**minio.yml**
- `internalAddress`: set to your minio Service name and port in your deployed.
- `externalAddress`: set to your already expose minio external address.
### Set the secret
A Secret is an object that contains a small amount of sensitive data. Such as password and secret. Secret is similar to ConfigMaps.
#### Redis:
Update the `redis-password` value in `redis-secret.yml` to your Redis password encoded in base64.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: openim-redis-secret
type: Opaque
data:
redis-password: b3BlbklNMTIz # update to your redis password encoded in base64, if need empty, you can set to ""
```
#### Mongo:
Update the `mongo_openim_username`, `mongo_openim_password` value in `mongo-secret.yml` to your Mongo username and password encoded in base64.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: openim-mongo-secret
type: Opaque
data:
mongo_openim_username: b3BlbklN # update to your mongo username encoded in base64, if need empty, you can set to "" (this user credentials need in authSource database).
mongo_openim_password: b3BlbklNMTIz # update to your mongo password encoded in base64, if need empty, you can set to ""
```
#### Minio:
Update the `minio-root-user` and `minio-root-password` value in `minio-secret.yml` to your MinIO accessKeyID and secretAccessKey encoded in base64.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: openim-minio-secret
type: Opaque
data:
minio-root-user: cm9vdA== # update to your minio accessKeyID encoded in base64, if need empty, you can set to ""
minio-root-password: b3BlbklNMTIz # update to your minio secretAccessKey encoded in base64, if need empty, you can set to ""
```
#### Kafka:
Update the `kafka-password` value in `kafka-secret.yml` to your Kafka password encoded in base64.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: openim-kafka-secret
type: Opaque
data:
kafka-password: b3BlbklNMTIz # update to your kafka password encoded in base64, if need empty, you can set to ""
```
### Apply the secret.
```shell
kubectl apply -f redis-secret.yml -f minio-secret.yml -f mongo-secret.yml -f kafka-secret.yml
```
### Apply all config
`kubectl apply -f ./openim-config.yml`
> Attation: If you use `default` namespace, you can excute `clusterRile.yml` to create a cluster role binding for default service account.
>
> Namespace is modify to `discovery.yml` in `openim-config.yml`, you can change `kubernetes.namespace` to your namespace.
**Excute `clusterRole.yml`**
`kubectl apply -f ./clusterRole.yml`
### run all deployments and services
> Note: Ensure that infrastructure services like MinIO, Redis, and Kafka are running before deploying the main applications.
```bash
kubectl apply \
-f openim-api-deployment.yml \
-f openim-api-service.yml \
-f openim-crontask-deployment.yml \
-f openim-rpc-user-deployment.yml \
-f openim-rpc-user-service.yml \
-f openim-msggateway-deployment.yml \
-f openim-msggateway-service.yml \
-f openim-push-deployment.yml \
-f openim-push-service.yml \
-f openim-msgtransfer-service.yml \
-f openim-msgtransfer-deployment.yml \
-f openim-rpc-conversation-deployment.yml \
-f openim-rpc-conversation-service.yml \
-f openim-rpc-auth-deployment.yml \
-f openim-rpc-auth-service.yml \
-f openim-rpc-group-deployment.yml \
-f openim-rpc-group-service.yml \
-f openim-rpc-friend-deployment.yml \
-f openim-rpc-friend-service.yml \
-f openim-rpc-msg-deployment.yml \
-f openim-rpc-msg-service.yml \
-f openim-rpc-third-deployment.yml \
-f openim-rpc-third-service.yml
```
### Verification
After deploying the services, verify that everything is running smoothly:
```bash
# Check the status of all pods
kubectl get pods
# Check the status of services
kubectl get svc
# Check the status of deployments
kubectl get deployments
# View all resources
kubectl get all
```
### clean all
`kubectl delete -f ./`
### Notes:
- If you use a specific namespace for your deployment, be sure to append the -n <namespace> flag to your kubectl commands.

View File

@ -1,85 +0,0 @@
# Kubernetes Deployment
## Resource Requests
- CPU: 2 cores
- Memory: 4 GiB
- Disk usage: 20 GiB (on Node)
## Origin Deploy
1. Enter the target dir
`cd ./deployments/deploy/`
2. Deploy configs and dependencies
Upate your `openim-config.yml`
Apply all config and dependencies
`kubectl apply -f ./openim-config.yml`
> Attation: If you use `default` namespace, you can excute `clusterRile.yml` to create a cluster role binding for default service account.
>
> Namespace is modify to `discovery.yml` in `openim-config.yml`, you can change `kubernetes.namespace` to your namespace.
Excute `clusterRole.yml`
`kubectl apply -f ./clusterRole.yml`
Run infrasturcture components.
`kubectl apply -f minio-service.yml -f minio-statefulset.yml -f mongo-service.yml -f mongo-statefulset.yml -f redis-service.yml -f redis-statefulset.yml -f kafka-service.yml -f kafka-statefulset.yml`
> Note: Ensure that infrastructure services like MinIO, Redis, and Kafka are running before deploying the main applications.
3. run all deployments and services
```bash
kubectl apply \
-f openim-api-deployment.yml \
-f openim-api-service.yml \
-f openim-crontask-deployment.yml \
-f openim-rpc-user-deployment.yml \
-f openim-rpc-user-service.yml \
-f openim-msggateway-deployment.yml \
-f openim-msggateway-service.yml \
-f openim-push-deployment.yml \
-f openim-push-service.yml \
-f openim-msgtransfer-service.yml \
-f openim-msgtransfer-deployment.yml \
-f openim-rpc-conversation-deployment.yml \
-f openim-rpc-conversation-service.yml \
-f openim-rpc-auth-deployment.yml \
-f openim-rpc-auth-service.yml \
-f openim-rpc-group-deployment.yml \
-f openim-rpc-group-service.yml \
-f openim-rpc-friend-deployment.yml \
-f openim-rpc-friend-service.yml \
-f openim-rpc-msg-deployment.yml \
-f openim-rpc-msg-service.yml \
-f openim-rpc-third-deployment.yml \
-f openim-rpc-third-service.yml
```
4. Verification
After deploying the services, verify that everything is running smoothly:
```bash
# Check the status of all pods
kubectl get pods
# Check the status of services
kubectl get svc
# Check the status of deployments
kubectl get deployments
# View all resources
kubectl get all
```
5. clean all
`kubectl delete -f ./`
### Notes:
- If you use a specific namespace for your deployment, be sure to append the -n <namespace> flag to your kubectl commands.

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: Secret
metadata:
name: openim-kafka-secret
type: Opaque
data:
kafka-password: ""

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: openim-minio-secret
type: Opaque
data:
minio-root-user: cm9vdA== # Base64 encoded "root"
minio-root-password: b3BlbklNMTIz # Base64 encoded "openIM123"

View File

@ -31,12 +31,12 @@ spec:
- name: MINIO_ROOT_USER
valueFrom:
secretKeyRef:
name: minio-secret
name: openim-minio-secret
key: minio-root-user
- name: MINIO_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: minio-secret
name: openim-minio-secret
key: minio-root-password
command:
- "/bin/sh"
@ -76,12 +76,4 @@ spec:
requests:
storage: 2Gi
---
apiVersion: v1
kind: Secret
metadata:
name: minio-secret
type: Opaque
data:
minio-root-user: cm9vdA== # Base64 encoded "root"
minio-root-password: b3BlbklNMTIz # Base64 encoded "openIM123"

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: openim-mongo-secret
type: Opaque
data:
mongo_openim_username: b3BlbklN # base64 for "openIM", this user credentials need in authSource database.
mongo_openim_password: b3BlbklNMTIz # base64 for "openIM123"

View File

@ -47,27 +47,27 @@ spec:
- name: MONGO_INITDB_ROOT_USERNAME
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-init-secret
key: mongo_initdb_root_username
- name: MONGO_INITDB_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-init-secret
key: mongo_initdb_root_password
- name: MONGO_INITDB_DATABASE
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-init-secret
key: mongo_initdb_database
- name: MONGO_OPENIM_USERNAME
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-init-secret
key: mongo_openim_username
- name: MONGO_OPENIM_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-init-secret
key: mongo_openim_password
- name: TZ
value: "Asia/Shanghai"
@ -93,3 +93,16 @@ spec:
resources:
requests:
storage: 5Gi
---
apiVersion: v1
kind: Secret
metadata:
name: openim-mongo-init-secret
type: Opaque
data:
mongo_initdb_root_username: cm9vdA== # base64 for "root"
mongo_initdb_root_password: b3BlbklNMTIz # base64 for "openIM123"
mongo_initdb_database: b3BlbmltX3Yz # base64 for "openim_v3"
mongo_openim_username: b3BlbklN # base64 for "openIM"
mongo_openim_password: b3BlbklNMTIz # base64 for "openIM123"

View File

@ -21,20 +21,18 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
key: mongo_openim_password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_password
volumeMounts:
- name: openim-config
@ -46,4 +44,4 @@ spec:
volumes:
- name: openim-config
configMap:
name: openim-config
name: openim-config

View File

@ -4,7 +4,7 @@ metadata:
name: openim-config
data:
discovery.yml: |
enable: "kubernetes"
enable: "kubernetes" # "kubernetes" or "etcd"
kubernetes:
namespace: default
etcd:
@ -26,7 +26,6 @@ data:
log.yml: |
# Log storage path, default is acceptable, change to a full path if modification is needed
# storageLocation: ../../../../logs/
storageLocation: ./logs/
# Log rotation period (in hours), default is acceptable
rotationTime: 24
@ -49,9 +48,9 @@ data:
# Name of the database
database: openim_v3
# Username for database authentication
username: openIM
username: '' # openIM
# Password for database authentication
password: openIM123
password: '' # openIM123
# Authentication source for database authentication, if use root user, set it to admin
authSource: openim_v3
# Maximum number of connections in the connection pool
@ -1055,16 +1054,3 @@ data:
- targets: [ internal_ip:12320 ]
labels:
namespace: default
---
apiVersion: v1
kind: Secret
metadata:
name: mongo-secret
type: Opaque
data:
mongo_initdb_root_username: cm9vdA== # base64 for "root"
mongo_initdb_root_password: b3BlbklNMTIz # base64 for "openIM123"
mongo_initdb_database: b3BlbmltX3Yz # base64 for "openim_v3"
mongo_openim_username: b3BlbklN # base64 for "openIM"
mongo_openim_password: b3BlbklNMTIz # base64 for "openIM123"

View File

@ -21,7 +21,7 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
volumeMounts:
- name: openim-config

View File

@ -21,13 +21,23 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
- name: IMENV_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: openim-kafka-secret
key: kafka-password
volumeMounts:
- name: openim-config
mountPath: "/config"

View File

@ -21,8 +21,13 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: openim-kafka-secret
key: kafka-password
volumeMounts:
- name: openim-config
mountPath: "/config"

View File

@ -22,7 +22,7 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
volumeMounts:
- name: openim-config

View File

@ -21,12 +21,17 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
volumeMounts:
- name: openim-config

View File

@ -14,19 +14,24 @@ spec:
spec:
containers:
- name: friend-rpc-server-container
image: openim/openim-rpc-friend:v3.8.3
image: openim/openim-rpc-friend:v3.8.3
env:
- name: CONFIG_PATH
value: "/config"
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
volumeMounts:
- name: openim-config
@ -38,4 +43,4 @@ spec:
volumes:
- name: openim-config
configMap:
name: openim-config
name: openim-config

View File

@ -15,19 +15,23 @@ spec:
containers:
- name: group-rpc-server-container
image: openim/openim-rpc-group:v3.8.3
env:
- name: CONFIG_PATH
value: "/config"
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
volumeMounts:
- name: openim-config
@ -39,4 +43,4 @@ spec:
volumes:
- name: openim-config
configMap:
name: openim-config
name: openim-config

View File

@ -14,20 +14,30 @@ spec:
spec:
containers:
- name: msg-rpc-server-container
image: openim/openim-rpc-msg:v3.8.3
image: openim/openim-rpc-msg:v3.8.3
env:
- name: CONFIG_PATH
value: "/config"
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
- name: IMENV_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: openim-kafka-secret
key: kafka-password
volumeMounts:
- name: openim-config
mountPath: "/config"
@ -38,4 +48,4 @@ spec:
volumes:
- name: openim-config
configMap:
name: openim-config
name: openim-config

View File

@ -14,29 +14,34 @@ spec:
spec:
containers:
- name: third-rpc-server-container
image: openim/openim-rpc-third:v3.8.3
image: openim/openim-rpc-third:v3.8.3
env:
- name: CONFIG_PATH
value: "/config"
- name: IMENV_MINIO_ACCESSKEYID
valueFrom:
secretKeyRef:
name: minio-secret
name: openim-minio-secret
key: minio-root-user
- name: IMENV_MINIO_SECRETACCESSKEY
valueFrom:
secretKeyRef:
name: minio-secret
name: openim-minio-secret
key: minio-root-password
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
volumeMounts:
- name: openim-config

View File

@ -21,13 +21,23 @@ spec:
- name: IMENV_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
name: openim-redis-secret
key: redis-password
- name: IMENV_MONGODB_USERNAME
valueFrom:
secretKeyRef:
name: openim-mongo-secret
key: mongo_openim_username
- name: IMENV_MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: mongo-secret
name: openim-mongo-secret
key: mongo_openim_password
- name: IMENV_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: openim-kafka-secret
key: kafka-password
volumeMounts:
- name: openim-config
mountPath: "/config"

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: Secret
metadata:
name: openim-redis-secret
type: Opaque
data:
redis-password: b3BlbklNMTIz # "openIM123" in base64

View File

@ -29,9 +29,6 @@ spec:
volumeMounts:
- name: redis-data
mountPath: /data
# - name: redis-config-volume
# mountPath: /usr/local/redis/config/redis.conf
# subPath: redis.conf
command:
[
"/bin/sh",
@ -56,11 +53,3 @@ spec:
resources:
requests:
storage: 5Gi
---
apiVersion: v1
kind: Secret
metadata:
name: redis-secret
type: Opaque
data:
redis-password: b3BlbklNMTIz # "openIM123" in base64

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.68
github.com/openimsdk/tools v0.0.50-alpha.61
github.com/openimsdk/tools v0.0.50-alpha.62
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0

4
go.sum
View File

@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.68 h1:Ekn6S9Ftt12Xs/p9kJ39RDr2gSwIczz+MmSHQE4lAek=
github.com/openimsdk/protocol v0.0.72-alpha.68/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.61 h1:zKEZwrj+fUVuyC6KR3kZp9zFaCCIFgoSbHO0r0mZ6h4=
github.com/openimsdk/tools v0.0.50-alpha.61/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U=
github.com/openimsdk/tools v0.0.50-alpha.62 h1:e/m1XL7+EXbkOoxr/En/612WcOPKOUHPBj0++gG6MuQ=
github.com/openimsdk/tools v0.0.50-alpha.62/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -108,11 +108,11 @@ func Start(ctx context.Context, index int, config *Config) error {
cm.Watch(ctx)
}
msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err

View File

@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq)
}
err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil {
log.ZError(
ctx,
"remove cache msg from redis err",
err,
"msg",
msgFromMQ.MsgData,
"conversationID",
msgFromMQ.ConversationID,
)
}
}
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error { // an instance in the consumer group
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
for msg := range claim.Messages() {

View File

@ -106,10 +106,8 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
return nil, err
}
remainTime := timeutil.GetCurrentTimestampBySecond() - req.Timestamp
for _, conversationID := range req.ConversationIDs {
if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, remainTime); err != nil {
log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err)
}
if _, err := m.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: remainTime, Limit: 9999}); err != nil {
return nil, err
}
return &msg.DeleteMsgPhysicalResp{}, nil
}

View File

@ -63,7 +63,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data))
var role int32
if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) {
switch msgs[0].SessionType {
sessionType := msgs[0].SessionType
switch sessionType {
case constant.SingleChatType:
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil {
return nil, err
@ -89,7 +90,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
role = member.RoleLevel
}
default:
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported")
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported", "sessionType", sessionType)
}
}
now := time.Now().UnixMilli()

View File

@ -101,14 +101,14 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
ConversationType: msg.SessionType,
GroupID: msg.GroupID,
}
memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID)
if err != nil {
log.ZWarn(ctx, "GetGroupMemberIDs", err)
return
}
tagAll := datautil.Contain(constant.AtAllString, msg.AtUserIDList...)
if tagAll {
memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID)
if err != nil {
log.ZWarn(ctx, "GetGroupMemberIDs", err)
return
}
memberUserIDList = datautil.DeleteElems(memberUserIDList, msg.SendID)
@ -118,6 +118,9 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
} else { // @Everyone and @other people
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe}
atUserID = datautil.SliceIntersectFuncs(atUserID, memberUserIDList, func(a string) string { return a }, func(b string) string {
return b
})
if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation)
}
@ -131,10 +134,13 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
return
}
atUserID = datautil.SliceIntersectFuncs(msg.AtUserIDList, memberUserIDList, func(a string) string { return a }, func(b string) string {
return b
})
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe}
if err := m.conversationClient.SetConversations(ctx, msg.AtUserIDList, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation)
if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, atUserID, conversation)
}
}

View File

@ -89,7 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
msgModel := redis.NewMsgCache(rdb)
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err

View File

@ -15,50 +15,16 @@
package cachekey
import (
"github.com/openimsdk/protocol/constant"
"strconv"
)
const (
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
exTypeKeyLocker = "EX_LOCK:"
reactionExSingle = "EX_SINGLE_"
reactionWriteGroup = "EX_GROUP_"
reactionReadGroup = "EX_SUPER_GROUP_"
reactionNotification = "EX_NOTIFICATION_"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
messageCache = "MSG_CACHE:"
)
func GetMessageCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
}
func GetMessageDelUserListKey(conversationID string, seq int64) string {
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
}
func GetUserDelListKey(conversationID, userID string) string {
return userDelMessagesList + conversationID + ":" + userID
}
func GetMessageReactionExKey(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return reactionExSingle + clientMsgID
case constant.WriteGroupChatType:
return reactionWriteGroup + clientMsgID
case constant.ReadGroupChatType:
return reactionReadGroup + clientMsgID
case constant.NotificationChatType:
return reactionNotification + clientMsgID
}
return ""
}
func GetLockMessageTypeKey(clientMsgID string, TypeKey string) string {
return exTypeKeyLocker + clientMsgID + "_" + TypeKey
func GetMsgCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + ":" + strconv.Itoa(int(seq))
}
func GetSendMsgKey(id string) string {

View File

@ -16,23 +16,14 @@ package cache
import (
"context"
"time"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
type MsgCache interface {
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error
}

View File

@ -2,10 +2,12 @@ package redis
import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
@ -13,76 +15,26 @@ import (
) //
// msgCacheTimeout is expiration time of message cache, 86400 seconds
const msgCacheTimeout = 86400
const msgCacheTimeout = time.Hour * 24
func NewMsgCache(client redis.UniversalClient) cache.MsgCache {
return &msgCache{rdb: client}
func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
return &msgCache{
rdb: client,
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
msgDocDatabase: db,
}
}
type msgCache struct {
rdb redis.UniversalClient
}
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
return cachekey.GetMessageCacheKey(conversationID, seq)
}
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
return cachekey.GetMessageDelUserListKey(conversationID, seq)
}
func (c *msgCache) getUserDelList(conversationID, userID string) string {
return cachekey.GetUserDelListKey(conversationID, userID)
rdb redis.UniversalClient
rcClient *rockscache.Client
msgDocDatabase database.Msg
}
func (c *msgCache) getSendMsgKey(id string) string {
return cachekey.GetSendMsgKey(id)
}
func (c *msgCache) getLockMessageTypeKey(clientMsgID string, TypeKey string) string {
return cachekey.GetLockMessageTypeKey(clientMsgID, TypeKey)
}
func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
return cachekey.GetMessageReactionExKey(clientMsgID, sessionType)
}
func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string {
return c.getMessageCacheKey(conversationID, msg.Seq)
})
keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string {
return c.getMessageCacheKey(conversationID, msg.Seq)
})
err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
var values []string
for _, key := range keys {
if msg, ok := msgMap[key]; ok {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return err
}
values = append(values, s)
}
}
return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, msgCacheTimeout)
})
if err != nil {
return 0, err
}
return len(msgs), nil
}
func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMessageCacheKey(conversationID, seq))
}
return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
return LuaDeleteBatch(ctx, c.rdb, keys)
})
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
}
@ -92,81 +44,53 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
return int32(result), errs.Wrap(err)
}
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 {
return nil, nil
}
getKey := func(seq int64) string {
return cachekey.GetMsgCacheKey(conversationID, seq)
}
getMsgID := func(msg *model.MsgInfoModel) int64 {
return msg.Msg.Seq
}
find := func(ctx context.Context, seqs []int64) ([]*model.MsgInfoModel, error) {
return c.msgDocDatabase.FindSeqs(ctx, conversationID, seqs)
}
return batchGetCache2(ctx, c.rcClient, msgCacheTimeout, seqs, getKey, getMsgID, find)
}
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
return errs.Wrap(c.rdb.Del(ctx, key).Err())
}
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error {
if len(seqs) == 0 {
return nil
}
keys := datautil.Slice(seqs, func(seq int64) string {
return cachekey.GetMsgCacheKey(conversationID, seq)
})
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys)
if err != nil {
return false, errs.Wrap(err)
return err
}
return n > 0, nil
}
func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
var keys []string
keySeqMap := make(map[string]int64, 10)
for _, seq := range seqs {
key := c.getMessageCacheKey(conversationID, seq)
keys = append(keys, key)
keySeqMap[key] = seq
for _, keys := range slotKeys {
if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil {
return err
}
}
err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
result, err := LuaGetBatch(ctx, c.rdb, keys)
return nil
}
func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error {
for _, msg := range msgs {
if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 {
continue
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
for i, value := range result {
seq := keySeqMap[keys[i]]
if value == nil {
failedSeqs = append(failedSeqs, seq)
continue
}
msg := &sdkws.MsgData{}
msgString, ok := value.(string)
if !ok || msgprocessor.String2Pb(msgString, msg) != nil {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, msg)
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
return err
}
return nil
})
if err != nil {
return nil, nil, err
}
return seqMsgs, failedSeqs, nil
return nil
}

View File

@ -1,133 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"fmt"
"github.com/openimsdk/protocol/sdkws"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"testing"
)
func Test_msgCache_SetMessagesToCache(t *testing.T) {
type fields struct {
rdb redis.UniversalClient
}
type args struct {
ctx context.Context
conversationID string
msgs []*sdkws.MsgData
}
tests := []struct {
name string
fields fields
args args
want int
wantErr assert.ErrorAssertionFunc
}{
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Username: "", Password: "openIM123", DB: 0})}, args{context.Background(),
"cid", []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}}, 3, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &msgCache{
rdb: tt.fields.rdb,
}
got, err := c.SetMessagesToCache(tt.args.ctx, tt.args.conversationID, tt.args.msgs)
if !tt.wantErr(t, err, fmt.Sprintf("SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)) {
return
}
assert.Equalf(t, tt.want, got, "SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)
})
}
}
func Test_msgCache_GetMessagesBySeq(t *testing.T) {
type fields struct {
rdb redis.UniversalClient
}
type args struct {
ctx context.Context
conversationID string
seqs []int64
}
var failedSeq []int64
tests := []struct {
name string
fields fields
args args
wantSeqMsgs []*sdkws.MsgData
wantFailedSeqs []int64
wantErr assert.ErrorAssertionFunc
}{
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})},
args{context.Background(), "cid", []int64{1, 2, 3}},
[]*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}, failedSeq, assert.NoError},
{"test2", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})},
args{context.Background(), "cid", []int64{4, 5, 6}},
nil, []int64{4, 5, 6}, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &msgCache{
rdb: tt.fields.rdb,
}
gotSeqMsgs, gotFailedSeqs, err := c.GetMessagesBySeq(tt.args.ctx, tt.args.conversationID, tt.args.seqs)
if !tt.wantErr(t, err, fmt.Sprintf("GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) {
return
}
equalMsgDataSlices(t, tt.wantSeqMsgs, gotSeqMsgs)
assert.Equalf(t, tt.wantFailedSeqs, gotFailedSeqs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)
})
}
}
func equalMsgDataSlices(t *testing.T, expected, actual []*sdkws.MsgData) {
assert.Equal(t, len(expected), len(actual), "Slices have different lengths")
for i := range expected {
assert.True(t, proto.Equal(expected[i], actual[i]), "Element %d not equal: expected %v, got %v", i, expected[i], actual[i])
}
}
func Test_msgCache_DeleteMessagesFromCache(t *testing.T) {
type fields struct {
rdb redis.UniversalClient
}
type args struct {
ctx context.Context
conversationID string
seqs []int64
}
tests := []struct {
name string
fields fields
args args
wantErr assert.ErrorAssertionFunc
}{
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123"})},
args{context.Background(), "cid", []int64{1, 2, 3}}, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &msgCache{
rdb: tt.fields.rdb,
}
tt.wantErr(t, c.DeleteMessagesFromCache(tt.args.ctx, tt.args.conversationID, tt.args.seqs),
fmt.Sprintf("DeleteMessagesFromCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs))
})
}
}

View File

@ -18,6 +18,9 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/utils/jsonutil"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
@ -36,7 +39,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
)
const (
@ -54,12 +56,8 @@ type CommonMsgDatabase interface {
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
// GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers.
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis
// cache).
GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages.
ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error)
// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers.
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
@ -80,8 +78,6 @@ type CommonMsgDatabase interface {
GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error)
GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
//GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error)
@ -92,10 +88,6 @@ type CommonMsgDatabase interface {
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
// get Msg when destruct msg before
//DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
@ -118,7 +110,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
}
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
msg: msg,
msgCache: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producer: producerToRedis,
@ -128,7 +120,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
type commonMsgDatabase struct {
msgDocDatabase database.Msg
msgTable model.MsgDocModel
msg cache.MsgCache
msgCache cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producer *kafka.Producer
@ -139,7 +131,7 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
return err
}
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(fields) == 0 {
return nil
}
@ -237,11 +229,15 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially
i += insert - 1 // Skip the inserted data
}
return nil
}
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error {
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil {
return err
}
return db.msgCache.DelMessageBySeqs(ctx, conversationID, []int64{seq})
}
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error {
@ -256,23 +252,17 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
return err
}
}
return nil
return db.msgCache.DelMessageBySeqs(ctx, conversationID, totalSeqs)
}
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil {
return nil, err
}
for _, msg := range msgs {
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
}
}
return totalMsgs, nil
return db.GetMessageBySeqs(ctx, conversationID, userID, seqs)
}
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) {
if msg == nil || msg.Msg == nil {
return
}
if msg.IsRead {
msg.Msg.IsRead = true
}
@ -360,9 +350,6 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][
return
}
msg.Msg.Content = string(data)
//if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msgTable.GetDocID(conversationID, msg.Msg.Seq), db.msgTable.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil {
// log.ZError(ctx, "UpdateMsgContent", err)
//}
}
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) {
@ -377,24 +364,6 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID
return msgs, err
}
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil {
return nil, err
}
for _, msg := range msgs {
if msg.IsRead {
msg.Msg.IsRead = true
}
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg))
}
}
return seqMsgs, nil
}
// GetMsgBySeqsRange In the context of group chat, we have the following parameters:
//
// "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation.
@ -463,37 +432,10 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
seqs = append(seqs, i)
}
}
if len(seqs) == 0 {
return 0, 0, nil, nil
successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, seqs)
if err != nil {
return 0, 0, nil, err
}
newBegin := seqs[0]
newEnd := seqs[len(seqs)-1]
var successMsgs []*sdkws.MsgData
log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd)
cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil && !errors.Is(err, redis.Nil) {
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs)
}
successMsgs = append(successMsgs, cachedMsgs...)
log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs)
// get from cache or db
if len(failedSeqs) > 0 {
log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs)
mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
if err != nil {
return 0, 0, nil, err
}
successMsgs = append(mongoMsgs, successMsgs...)
//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs)
//if err != nil {
// return 0, 0, nil, err
//}
}
return minSeq, maxSeq, successMsgs, nil
}
@ -529,31 +471,9 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
newSeqs = append(newSeqs, seq)
}
}
if len(newSeqs) == 0 {
return minSeq, maxSeq, nil, nil
}
successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs)
successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs)
if err != nil {
if !errors.Is(err, redis.Nil) {
log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
}
}
log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs",
seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs)
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
if err != nil {
return 0, 0, nil, err
}
successMsgs = append(successMsgs, mongoMsgs...)
//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs)
//if err != nil {
// return 0, 0, nil, err
//}
return 0, 0, nil, err
}
return minSeq, maxSeq, successMsgs, nil
}
@ -607,174 +527,14 @@ func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, use
if len(newSeqs) == 0 {
return isEnd, endSeq, nil, nil
}
successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs)
successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs)
if err != nil {
if !errors.Is(err, redis.Nil) {
log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
}
}
log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs",
seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs)
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
if err != nil {
return false, 0, nil, err
}
successMsgs = append(successMsgs, mongoMsgs...)
//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs)
//if err != nil {
// return 0, 0, nil, err
//}
return false, 0, nil, err
}
return isEnd, endSeq, successMsgs, nil
}
func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error {
var delStruct delMsgRecursionStruct
var skip int64
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime)
if err != nil {
return err
}
log.ZDebug(ctx, "DeleteConversationMsgsAndSetMinSeq", "conversationID", conversationID, "minSeq", minSeq)
if minSeq == 0 {
return nil
}
return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
}
func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) {
var index int64
for {
// from oldest 2 newest, ASC
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == model.ErrMsgListNotExist {
log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion
break
}
index++
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli()
if len(msgDocModel.Msg) > 0 {
i := 0
var over bool
for _, msg := range msgDocModel.Msg {
i++
// over clear time, need to clear
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() {
// if msg is not in del list, add to del list
if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) {
seqs = append(seqs, msg.Msg.Seq)
}
} else {
log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i)
over = true
break
}
}
if over {
break
}
}
}
log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs)
// have msg need to destruct
if len(seqs) > 0 {
// update min seq to clear after
userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before
if err != nil {
return nil, err
}
// if before < after, update min seq
if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
}
return seqs, nil
}
// this is struct for recursion.
type delMsgRecursionStruct struct {
minSeq int64
delDocIDs []string
}
func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
return d.minSeq
}
// index 0....19(del) 20...69
// seq 70
// set minSeq 21
// recursion deletes the list and returns the set minimum seq.
func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == model.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion
err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs)
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg))
if int64(len(msgDocModel.Msg)) > db.msgTable.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "length", len(msgDocModel.Msg), "docID:", msgDocModel.DocID)
}
if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() {
log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID)
delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID)
delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq
} else {
var delMsgIndexs []int
for i, MsgInfoModel := range msgDocModel.Msg {
if MsgInfoModel != nil && MsgInfoModel.Msg != nil {
if timeutil.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) {
delMsgIndexs = append(delMsgIndexs, i)
}
}
}
if len(delMsgIndexs) > 0 {
if err = db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil {
log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index)
}
delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq)
}
}
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err
}
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil {
return err
}
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
var indexes []int
for _, seq := range seqs {
@ -784,13 +544,10 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
return err
}
}
return nil
return db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs)
}
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs); err != nil {
return err
}
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
for _, seq := range seqs {
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {
@ -798,7 +555,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st
}
}
}
return nil
return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
@ -809,11 +566,6 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin
return db.seqConversation.GetMaxSeq(ctx, conversationID)
}
//
//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
//}
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return db.seqConversation.SetMinSeqs(ctx, seqs)
}
@ -847,11 +599,11 @@ func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, c
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.msg.SetSendMsgStatus(ctx, id, status)
return db.msgCache.SetSendMsgStatus(ctx, id, status)
}
func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.msg.GetSendMsgStatus(ctx, id)
return db.msgCache.GetSendMsgStatus(ctx, id)
}
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
@ -888,26 +640,11 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation
return
}
func (db *commonMsgDatabase) RangeUserSendCount(
ctx context.Context,
start time.Time,
end time.Time,
group bool,
ase bool,
pageNumber int32,
showNumber int32,
) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber)
}
func (db *commonMsgDatabase) RangeGroupSendCount(
ctx context.Context,
start time.Time,
end time.Time,
ase bool,
pageNumber int32,
showNumber int32,
) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) {
func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) {
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
}
@ -947,43 +684,10 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID
return totalMsgs, nil
}
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit)
}
//
//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
// var notNull int
// index := make([]int, 0, len(doc.Msg))
// for i, message := range doc.Msg {
// if message.Msg != nil {
// notNull++
// if message.Msg.SendTime < ts {
// index = append(index, i)
// }
// }
// }
// if len(index) == 0 {
// return index, nil
// }
// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
// return index, err
// }
// if len(index) == notNull {
// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
// } else {
// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
// }
//}
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil {
@ -998,10 +702,6 @@ func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID strin
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
}
func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
return db.msgDocDatabase.GetRandDocIDs(ctx, limit)
}
func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs)
}
@ -1016,9 +716,103 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio
}
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
return db.msgDocDatabase.DeleteDoc(ctx, docID)
index := strings.LastIndex(docID, ":")
if index <= 0 {
return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID)
}
index, err := strconv.Atoi(docID[index+1:])
if err != nil {
return errs.WrapMsg(err, "strconv.Atoi", "docID", docID)
}
conversationID := docID[:index]
seqs := make([]int64, db.msgTable.GetSingleGocMsgNum())
minSeq := db.msgTable.GetMinSeq(index)
for i := range seqs {
seqs[i] = minSeq + int64(i)
}
if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil {
return err
}
return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time)
}
func (db *commonMsgDatabase) handlerDeleteAndRevoked(ctx context.Context, userID string, msgs []*model.MsgInfoModel) {
for i := range msgs {
msg := msgs[i]
if msg == nil || msg.Msg == nil {
continue
}
msg.Msg.IsRead = msg.IsRead
if datautil.Contain(userID, msg.DelList...) {
msg.Msg.Content = ""
msg.Msg.Status = constant.MsgDeleted
}
if msg.Revoke == nil {
continue
}
msg.Msg.ContentType = constant.MsgRevokeNotification
revokeContent := sdkws.MessageRevokedContent{
RevokerID: msg.Revoke.UserID,
RevokerRole: msg.Revoke.Role,
ClientMsgID: msg.Msg.ClientMsgID,
RevokerNickname: msg.Revoke.Nickname,
RevokeTime: msg.Revoke.Time,
SourceMessageSendTime: msg.Msg.SendTime,
SourceMessageSendID: msg.Msg.SendID,
SourceMessageSenderNickname: msg.Msg.SenderNickname,
SessionType: msg.Msg.SessionType,
Seq: msg.Msg.Seq,
Ex: msg.Msg.Ex,
}
data, err := jsonutil.JsonMarshal(&revokeContent)
if err != nil {
log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal MessageRevokedContent", err, "msg", msg)
continue
}
elem := sdkws.NotificationElem{
Detail: string(data),
}
content, err := jsonutil.JsonMarshal(&elem)
if err != nil {
log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal NotificationElem", err, "msg", msg)
continue
}
msg.Msg.Content = string(content)
}
}
func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversationID string, msgs []*model.MsgInfoModel) {
temp := make(map[int64][]*model.MsgInfoModel)
for i := range msgs {
db.handlerDBMsg(ctx, temp, userID, conversationID, msgs[i])
}
}
func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) {
msgs, err := db.msgCache.GetMessageBySeqs(ctx, conversationID, seqs)
if err != nil {
return nil, err
}
db.handlerDeleteAndRevoked(ctx, userID, msgs)
db.handlerQuote(ctx, userID, conversationID, msgs)
seqMsgs := make(map[int64]*model.MsgInfoModel)
for i, msg := range msgs {
if msg.Msg == nil {
continue
}
seqMsgs[msg.Msg.Seq] = msgs[i]
}
res := make([]*sdkws.MsgData, 0, len(seqs))
for _, seq := range seqs {
if v, ok := seqMsgs[seq]; ok {
res = append(res, convert.MsgDB2Pb(v.Msg))
} else {
res = append(res, &sdkws.MsgData{Seq: seq})
}
}
return res, nil
}

View File

@ -2,10 +2,11 @@ package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -50,7 +51,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse
}
return &msgTransferDatabase{
msgDocDatabase: msgDocModel,
msg: msg,
msgCache: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producerToMongo: producerToMongo,
@ -61,7 +62,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse
type msgTransferDatabase struct {
msgDocDatabase database.Msg
msgTable model.MsgDocModel
msg cache.MsgCache
msgCache cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producerToMongo *kafka.Producer
@ -73,10 +74,12 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat
return errs.ErrArgs.WrapMsg("msgList is empty")
}
msgs := make([]any, len(msgList))
seqs := make([]int64, len(msgList))
for i, msg := range msgList {
if msg == nil {
continue
}
seqs[i] = msg.Seq
var offlinePushModel *model.OfflinePushModel
if msg.OfflinePushInfo != nil {
offlinePushModel = &model.OfflinePushModel{
@ -114,7 +117,11 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat
Ex: msg.Ex,
}
}
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
if err := db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq); err != nil {
return err
}
//return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
return nil
}
func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
@ -219,7 +226,7 @@ func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversatio
}
func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
}
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) {
@ -238,20 +245,22 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
isNew = currentMaxSeq == 0
lastMaxSeq := currentMaxSeq
userSeqMap := make(map[string]int64)
seqs := make([]int64, 0, lenList)
for _, m := range msgs {
currentMaxSeq++
m.Seq = currentMaxSeq
userSeqMap[m.SendID] = m.Seq
seqs = append(seqs, m.Seq)
}
failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
if err != nil {
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prommetrics.MsgInsertRedisSuccessCounter.Inc()
msgToDB := func(msg *sdkws.MsgData) *model.MsgInfoModel {
return &model.MsgInfoModel{
Msg: convert.MsgPb2DB(msg),
}
}
return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err)
if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, msgToDB)); err != nil {
return 0, false, nil, err
}
return lastMaxSeq, isNew, userSeqMap, nil
}
func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {

View File

@ -7,15 +7,12 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/utils/datautil"
"golang.org/x/exp/rand"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@ -42,12 +39,6 @@ type MsgMgo struct {
model model.MsgDocModel
}
func (m *MsgMgo) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error {
filter := bson.M{"doc_id": docID}
update := bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}}
return mongoutil.UpdateOne(ctx, m.coll, filter, update, false)
}
func (m *MsgMgo) Create(ctx context.Context, msg *model.MsgDocModel) error {
return mongoutil.InsertMany(ctx, m.coll, []*model.MsgDocModel{msg})
}
@ -80,16 +71,6 @@ func (m *MsgMgo) PushUnique(ctx context.Context, docID string, index int64, key
return mongoutil.UpdateOneResult(ctx, m.coll, filter, update)
}
func (m *MsgMgo) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {
filter := bson.M{"doc_id": docID}
update := bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}}
return mongoutil.UpdateOne(ctx, m.coll, filter, update, false)
}
func (m *MsgMgo) IsExistDocID(ctx context.Context, docID string) (bool, error) {
return mongoutil.Exist(ctx, m.coll, bson.M{"doc_id": docID})
}
func (m *MsgMgo) FindOneByDocID(ctx context.Context, docID string) (*model.MsgDocModel, error) {
return mongoutil.FindOne[*model.MsgDocModel](ctx, m.coll, bson.M{"doc_id": docID})
}
@ -218,13 +199,6 @@ func (m *MsgMgo) GetOldestMsg(ctx context.Context, conversationID string) (*mode
}
}
func (m *MsgMgo) DeleteDocs(ctx context.Context, docIDs []string) error {
if len(docIDs) == 0 {
return nil
}
return mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": bson.M{"$in": docIDs}})
}
func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error) {
if sort != 1 && sort != -1 {
return nil, errs.ErrArgs.WrapMsg("mongo sort must be 1 or -1")
@ -279,95 +253,6 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do
return nil
}
//func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) {
//
// return nil, nil
//}
//func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) {
// var pipeline bson.A
// if !nextID.IsZero() {
// pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
// }
// pipeline = append(pipeline,
// bson.M{"$match": filter},
// bson.M{"$limit": limit},
// bson.M{"$unwind": "$msgs"},
// bson.M{"$match": filter},
// bson.M{
// "$group": bson.M{
// "_id": "$_id",
// "doc_id": bson.M{
// "$first": "$doc_id",
// },
// "msgs": bson.M{"$push": "$msgs"},
// },
// },
// )
// if !content {
// pipeline = append(pipeline,
// bson.M{
// "$project": bson.M{
// "_id": 1,
// "count": bson.M{"$size": "$msgs"},
// },
// },
// )
// type result struct {
// ID primitive.ObjectID `bson:"_id"`
// Count int64 `bson:"count"`
// }
// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
// if err != nil {
// return 0, nil, primitive.ObjectID{}, err
// }
// if len(res) == 0 {
// return 0, nil, primitive.ObjectID{}, nil
// }
// var count int64
// for _, r := range res {
// count += r.Count
// }
// return count, nil, res[len(res)-1].ID, nil
// }
// type result struct {
// ID primitive.ObjectID `bson:"_id"`
// Msg []*model.MsgInfoModel `bson:"msgs"`
// }
// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
// if err != nil {
// return 0, nil, primitive.ObjectID{}, err
// }
// if len(res) == 0 {
// return 0, nil, primitive.ObjectID{}, err
// }
// var count int
// for _, r := range res {
// count += len(r.Msg)
// }
// msgs := make([]*model.MsgInfoModel, 0, count)
// for _, r := range res {
// msgs = append(msgs, r.Msg...)
// }
// return int64(count), msgs, res[len(res)-1].ID, nil
//}
/*
db.msg3.aggregate(
[
{
"$match": {
"doc_id": "si_7009965934_8710838466:0"
},
}
]
)
*/
type searchMessageIndex struct {
ID primitive.ObjectID `bson:"_id"`
Index []int64 `bson:"index"`
@ -512,22 +397,6 @@ func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (
}
}
func (m *MsgMgo) getDocRange(ctx context.Context, id primitive.ObjectID, index []int64) ([]*model.MsgInfoModel, error) {
if len(index) == 0 {
return nil, nil
}
pipeline := bson.A{
bson.M{"$match": bson.M{"_id": id}},
bson.M{"$project": "$msgs"},
}
msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
if err != nil {
return nil, err
}
return msgs, nil
}
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) {
count, data, err := m.searchMessage(ctx, req)
if err != nil {
@ -556,143 +425,6 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (
return count, msgs, nil
}
//func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) {
// where := make(bson.A, 0, 6)
// if req.RecvID != "" {
// if req.SessionType == constant.ReadGroupChatType {
// where = append(where, bson.M{
// "$or": bson.A{
// bson.M{"doc_id": "^n_" + req.RecvID + ":"},
// bson.M{"doc_id": "^sg_" + req.RecvID + ":"},
// },
// })
// } else {
// where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
// }
// }
// if req.SendID != "" {
// where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
// }
// if req.ContentType != 0 {
// where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
// }
// if req.SessionType != 0 {
// where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
// }
// if req.SendTime != "" {
// sendTime, err := time.Parse(time.DateOnly, req.SendTime)
// if err != nil {
// return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
// }
// where = append(where,
// bson.M{
// "msgs.msg.send_time": bson.M{
// "$gte": sendTime.UnixMilli(),
// },
// },
// bson.M{
// "msgs.msg.send_time": bson.M{
// "$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
// },
// },
// )
// }
// opt := options.Find().SetLimit(100)
// res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt)
// if err != nil {
// return 0, nil, err
// }
// _ = res
// fmt.Println()
//
// return 0, nil, nil
// pipeline := bson.A{
// bson.M{
// "$unwind": "$msgs",
// },
// }
// if len(where) > 0 {
// pipeline = append(pipeline, bson.M{
// "$match": bson.M{"$and": where},
// })
// }
// pipeline = append(pipeline,
// bson.M{
// "$project": bson.M{
// "_id": 0,
// "msg": "$msgs.msg",
// },
// },
// bson.M{
// "$count": "count",
// },
// )
// //count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
// //if err != nil {
// // return 0, nil, err
// //}
// //if len(count) == 0 || count[0] == 0 {
// // return 0, nil, nil
// //}
// count := []int32{0}
// pipeline = pipeline[:len(pipeline)-1]
// pipeline = append(pipeline,
// bson.M{
// "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
// },
// bson.M{
// "$limit": req.Pagination.GetShowNumber(),
// },
// )
// msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
// if err != nil {
// return 0, nil, err
// }
// for i := range msgs {
// msgInfo := msgs[i]
// if msgInfo == nil || msgInfo.Msg == nil {
// continue
// }
// if msgInfo.Revoke != nil {
// revokeContent := sdkws.MessageRevokedContent{
// RevokerID: msgInfo.Revoke.UserID,
// RevokerRole: msgInfo.Revoke.Role,
// ClientMsgID: msgInfo.Msg.ClientMsgID,
// RevokerNickname: msgInfo.Revoke.Nickname,
// RevokeTime: msgInfo.Revoke.Time,
// SourceMessageSendTime: msgInfo.Msg.SendTime,
// SourceMessageSendID: msgInfo.Msg.SendID,
// SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
// SessionType: msgInfo.Msg.SessionType,
// Seq: msgInfo.Msg.Seq,
// Ex: msgInfo.Msg.Ex,
// }
// data, err := jsonutil.JsonMarshal(&revokeContent)
// if err != nil {
// return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
// }
// elem := sdkws.NotificationElem{Detail: string(data)}
// content, err := jsonutil.JsonMarshal(&elem)
// if err != nil {
// return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
// }
// msgInfo.Msg.ContentType = constant.MsgRevokeNotification
// msgInfo.Msg.Content = string(content)
// }
// }
// //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
// //n := int32(len(msgs))
// //if start >= n {
// // return n, []*relation.MsgInfoModel{}, nil
// //}
// //if start+req.Pagination.ShowNumber < n {
// // msgs = msgs[start : start+req.Pagination.ShowNumber]
// //} else {
// // msgs = msgs[start:]
// //}
// return count[0], msgs, nil
//}
func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
var sort int
if ase {
@ -1178,94 +910,6 @@ func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end t
return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil
}
func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
msgDocs, err := mongoutil.Find[*model.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex})
if err != nil {
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
continue
}
if len(msgDocs) < 1 {
continue
}
log.ZDebug(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
if err := mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil {
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
continue
}
var newMsgDocs []any
for _, msgDoc := range msgDocs {
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
continue
}
var index int64
for index < int64(len(msgDoc.Msg)) {
msg := msgDoc.Msg[index]
if msg != nil && msg.Msg != nil {
msgDocModel := model.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
end := index + m.model.GetSingleGocMsgNum()
if int(end) >= len(msgDoc.Msg) {
msgDocModel.Msg = msgDoc.Msg[index:]
} else {
msgDocModel.Msg = msgDoc.Msg[index:end]
}
newMsgDocs = append(newMsgDocs, msgDocModel)
index = end
} else {
break
}
}
}
if err = mongoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil {
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
} else {
log.ZDebug(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
}
}
}
}
func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
var skip int
var docIDs []string
var offset int
count, err := m.coll.CountDocuments(ctx, bson.M{})
if err != nil {
return nil, err
}
if count < int64(limit) {
skip = 0
} else {
rand.Seed(uint64(time.Now().UnixMilli()))
skip = rand.Intn(int(count / int64(limit)))
offset = skip * limit
}
log.ZDebug(ctx, "offset", "skip", skip, "offset", offset)
res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
{
"$project": bson.M{
"doc_id": 1,
},
},
{
"$skip": offset,
},
{
"$limit": limit,
},
})
for _, doc := range res {
docIDs = append(docIDs, doc.DocID)
}
return docIDs, errs.Wrap(err)
}
func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
{
@ -1297,18 +941,6 @@ func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*
})
}
func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int) error {
if len(index) == 0 {
return nil
}
model := &model.MsgInfoModel{DelList: []string{}}
set := make(map[string]any)
for i := range index {
set[fmt.Sprintf("msgs.%d", i)] = model
}
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
}
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
}
@ -1364,3 +996,55 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str
}
return seq, nil
}
func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) {
if len(indexes) == 0 {
return nil, nil
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: bson.D{
{Key: "doc_id", Value: docID},
}}},
bson.D{{Key: "$project", Value: bson.D{
{Key: "_id", Value: 0},
{Key: "doc_id", Value: 1},
{Key: "msgs", Value: bson.D{
{Key: "$map", Value: bson.D{
{Key: "input", Value: indexes},
{Key: "as", Value: "index"},
{Key: "in", Value: bson.D{
{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},
}},
}},
}},
}}},
}
msgDocModel, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline)
if err != nil {
return nil, err
}
if len(msgDocModel) == 0 {
return nil, nil
}
return msgDocModel[0].Msg, nil
}
func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 {
return nil, nil
}
result := make([]*model.MsgInfoModel, 0, len(seqs))
for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
if err != nil {
return nil, err
}
for i, re := range res {
if re == nil || re.Msg == nil {
continue
}
result = append(result, res[i])
}
}
return result, nil
}

View File

@ -24,30 +24,20 @@ import (
)
type Msg interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error
Create(ctx context.Context, model *model.MsgDocModel) error
UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error)
FindOneByDocID(ctx context.Context, docID string) (*model.MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*model.MsgInfoModel, error)
GetNewestMsg(ctx context.Context, conversationID string) (*model.MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*model.MsgInfoModel, error)
DeleteDocs(ctx context.Context, docIDs []string) error
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error)
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error
SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error)
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
DeleteDoc(ctx context.Context, docID string) error
DeleteMsgByIndex(ctx context.Context, docID string, index []int) error
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
GetRandDocIDs(ctx context.Context, limit int) ([]string, error)
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
}

View File

@ -143,3 +143,7 @@ func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdk
}
return exceptionMsg
}
func (*MsgDocModel) GetMinSeq(index int) int64 {
return int64(index*singleGocMsgNum) + 1
}