Compare commits

...

9 Commits

Author SHA1 Message Date
xuzhijvn
3e77070239
Merge 8cd57d68535f83c6495423e777f6a30c88cbe0f1 into c208c3789fbbc2b11557330c685ba2576bb0223f 2025-07-29 16:17:12 +08:00
xuzhijvn
c208c3789f
fix: searchMessage method has potential NPE bug (#3287) (#3289) 2025-07-29 08:16:30 +00:00
chao
2d4cf99744
fix: performance issues with Kafka caused by encapsulating the MQ interface (#3485) 2025-07-28 09:10:38 +00:00
OpenIM-Gordon
b44e56b151
docs: update readme of config file. (#3356) 2025-07-28 07:51:26 +00:00
github-actions[bot]
b19b81b10e
Update CHANGELOG for release v3.8.3-patch.6 (#3473)
Co-authored-by: FGadvancer <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
2025-07-28 07:33:05 +00:00
github-actions[bot]
80de08e8ed
Update CHANGELOG for release v3.8.3-patch.5 (#3405)
Co-authored-by: mo3et <34803812+mo3et@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
2025-07-28 07:29:41 +00:00
Monet Lee
4c9fdf70db
feat: support mongo replicaset mode. (#3433)
* feat: support mongo replicaset mode.

* fix mongo config.
2025-07-28 06:22:40 +00:00
OpenIM-Gordon
d6cd0258a5
fix: correctly aggregate read seqs by conversation and user before DB update. (#3442)
* build: docker compose file add some comments.

* fix: correctly aggregate read seqs by conversation and user before DB update.
2025-07-28 04:11:38 +00:00
xuzhijvn
8cd57d6853 fix: the loop querying the db in func batchGetCache2 (#3301) 2025-04-24 15:32:41 +08:00
15 changed files with 414 additions and 191 deletions

View File

@ -1,47 +1,70 @@
## [v3.8.3-patch.4](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.4) (2025-03-13)
### Bug Fixes
* fix: solve unocrrect invite notificationfrom #3213
**Full Changelog**: [v3.8.3-patch.3...v3.8.3-patch.4](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.3...v3.8.3-patch.4)
## [v3.8.3-patch.3](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.3) (2025-03-07)
### New Features
* feat: optimizing BatchGetIncrementalGroupMember #3180
### Bug Fixes
* fix: solve uncorrect notification when set group info #3172
* fix: the sorting is wrong after canceling the administrator in group settings #3185
* fix: solve uncorrect GroupMember enter group notification type. #3188
### Refactors
* refactor: change sendNotification to sendMessage to avoid ambiguity regarding message sending behavior. #3173
**Full Changelog**: [v3.8.3-patch.2...v3.8.3-patch.3](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.2...v3.8.3-patch.3)
## [v3.8.3-patch.2](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.2) (2025-02-28)
### Bug Fixes
* fix: Offline push does not have a badge && Android offline push (#3146) [#3174](https://github.com/openimsdk/open-im-server/pull/3174)
**Full Changelog**: [v3.8.3-patch.1...v3.8.3-patch.2](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.1...v3.8.3-patch.2)
## [v3.8.3-patch.1](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.1) (2025-02-25)
### New Features
* feat: add backup volume && optimize log print [Created [#3121](https://github.com/openimsdk/open-im-server/pull/3121)
### Bug Fixes
* fix: seq conversion failed without exiting [Created [#3120](https://github.com/openimsdk/open-im-server/pull/3120)
* fix: check error in BatchSetTokenMapByUidPid [Created [#3123](https://github.com/openimsdk/open-im-server/pull/3123)
* fix: DeleteDoc crash [Created [#3124](https://github.com/openimsdk/open-im-server/pull/3124)
* fix: the abnormal message has no sending time, causing the SDK to be abnormal [Created [#3126](https://github.com/openimsdk/open-im-server/pull/3126)
* fix: crash caused [#3127](https://github.com/openimsdk/open-im-server/pull/3127)
* fix: the user sets the conversation timer cleanup timestamp unit incorrectly [Created [#3128](https://github.com/openimsdk/open-im-server/pull/3128)
* fix: seq conversion not reading env in docker environment [Created [#3131](https://github.com/openimsdk/open-im-server/pull/3131)
### Builds
* build: improve workflows contents. [Created [#3125](https://github.com/openimsdk/open-im-server/pull/3125)
## [v3.8.3-patch.6](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.6) (2025-07-23)
### Bug Fixes
* fix: Add friend DB in notification sender [#3438](https://github.com/openimsdk/open-im-server/pull/3438)
* fix: remove update version file workflows have new line in 3.8.3-patch branch. [#3452](https://github.com/openimsdk/open-im-server/pull/3452)
* fix: s3 aws init [#3454](https://github.com/openimsdk/open-im-server/pull/3454)
* fix: use safe submodule init in workflows in v3.8.3-patch. [#3469](https://github.com/openimsdk/open-im-server/pull/3469)
**Full Changelog**: [v3.8.3-patch.5...v3.8.3-patch.6](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.5...v3.8.3-patch.6)
## [v3.8.3-patch.5](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.5) (2025-06-10)
### New Features
* feat: optimize friend and group applications [#3396](https://github.com/openimsdk/open-im-server/pull/3396)
### Bug Fixes
* fix: solve unocrrect invite notification [Created [#3219](https://github.com/openimsdk/open-im-server/pull/3219)
### Builds
* build: update gomake version in dockerfile.[Patch branch] [#3416](https://github.com/openimsdk/open-im-server/pull/3416)
**Full Changelog**: [v3.8.3...v3.8.3-patch.5](https://github.com/openimsdk/open-im-server/compare/v3.8.3...v3.8.3-patch.5)
## [v3.8.3-patch.4](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.4) (2025-03-13)
### Bug Fixes
* fix: solve unocrrect invite notificationfrom #3213
**Full Changelog**: [v3.8.3-patch.3...v3.8.3-patch.4](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.3...v3.8.3-patch.4)
## [v3.8.3-patch.3](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.3) (2025-03-07)
### New Features
* feat: optimizing BatchGetIncrementalGroupMember #3180
### Bug Fixes
* fix: solve uncorrect notification when set group info #3172
* fix: the sorting is wrong after canceling the administrator in group settings #3185
* fix: solve uncorrect GroupMember enter group notification type. #3188
### Refactors
* refactor: change sendNotification to sendMessage to avoid ambiguity regarding message sending behavior. #3173
**Full Changelog**: [v3.8.3-patch.2...v3.8.3-patch.3](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.2...v3.8.3-patch.3)
## [v3.8.3-patch.2](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.2) (2025-02-28)
### Bug Fixes
* fix: Offline push does not have a badge && Android offline push (#3146) [#3174](https://github.com/openimsdk/open-im-server/pull/3174)
**Full Changelog**: [v3.8.3-patch.1...v3.8.3-patch.2](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.1...v3.8.3-patch.2)
## [v3.8.3-patch.1](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.1) (2025-02-25)
### New Features
* feat: add backup volume && optimize log print [Created [#3121](https://github.com/openimsdk/open-im-server/pull/3121)
### Bug Fixes
* fix: seq conversion failed without exiting [Created [#3120](https://github.com/openimsdk/open-im-server/pull/3120)
* fix: check error in BatchSetTokenMapByUidPid [Created [#3123](https://github.com/openimsdk/open-im-server/pull/3123)
* fix: DeleteDoc crash [Created [#3124](https://github.com/openimsdk/open-im-server/pull/3124)
* fix: the abnormal message has no sending time, causing the SDK to be abnormal [Created [#3126](https://github.com/openimsdk/open-im-server/pull/3126)
* fix: crash caused [#3127](https://github.com/openimsdk/open-im-server/pull/3127)
* fix: the user sets the conversation timer cleanup timestamp unit incorrectly [Created [#3128](https://github.com/openimsdk/open-im-server/pull/3128)
* fix: seq conversion not reading env in docker environment [Created [#3131](https://github.com/openimsdk/open-im-server/pull/3131)
### Builds
* build: improve workflows contents. [Created [#3125](https://github.com/openimsdk/open-im-server/pull/3125)
**Full Changelog**: [v3.8.3-e-v1.1.5...v3.8.3-patch.1-e-v1.1.5](https://github.com/openimsdk/open-im-server-enterprise/compare/v3.8.3-e-v1.1.5...v3.8.3-patch.1-e-v1.1.5)

View File

@ -1,48 +1,66 @@
---
title: 'OpenIM Configuration Files and Common Configuration Item Modifications Guide'
# OpenIM Configuration File Descriptions and Common Configuration Modifications
## Configuration Files Explanation
## External Component Configurations
| Configuration File | Description |
| ------------------------------- | ------------------------------------------------------------ |
| **kafka.yml** | Configurations for Kafka username, password, address, etc. |
| **redis.yml** | Configurations for Redis password, address, etc. |
| **minio.yml** | Configurations for MinIO username, password, address, and external IP/domain; failing to modify external IP or domain may cause image file sending failures |
| **zookeeper.yml** | Configurations for ZooKeeper user, password, address, etc. |
| **mongodb.yml** | Configurations for MongoDB username, password, address, etc. |
| **log.yml** | Configurations for log level and storage directory. |
| **notification.yml** | Configurations for events like adding friends, creating groups, etc. |
| **share.yml** | Common configurations needed by various OpenIM services, such as secret. |
| **webhooks.yml** | Configurations for URLs in Webhook. |
| **local-cache.yml** | Local cache configurations. |
| **openim-rpc-third.yml** | Configurations for listening IP, port, and storage settings for images and videos in openim-rpc-third service. |
| **openim-rpc-user.yml** | Configurations for listening IP and port in openim-rpc-user service. |
| **openim-api.yml** | Configurations for listening IP, port, etc., in openim-api service. |
| **openim-crontask.yml** | Configurations for openim-crontask service. |
| **openim-msggateway.yml** | Configurations for listening IP, port, etc., in openim-msggateway service. |
| **openim-msgtransfer.yml** | Configurations for openim-msgtransfer service. |
| **openim-push.yml** | Configurations for listening IP, port, and offline push settings in openim-push service. |
| **openim-rpc-auth.yml** | Configurations for listening IP, port, and token expiration settings in openim-rpc-auth service. |
| **openim-rpc-conversation.yml** | Configurations for listening IP, port, etc., in openim-rpc-conversation service. |
| **openim-rpc-friend.yml** | Configurations for listening IP, port, etc., in openim-rpc-friend service. |
| **openim-rpc-group.yml** | Configurations for listening IP, port, etc., in openim-rpc-group service. |
| **openim-rpc-msg.yml** | Configurations for listening IP, port, and whether to verify friendship before sending messages in openim-rpc-msg service. |
| Configuration File | Description |
| ------------------ |-------------------------------------------------------------|
| **kafka.yml** | Configuration for Kafka username, password, address, etc. |
| **redis.yml** | Configuration for Redis password, address, etc. |
| **minio.yml** | Configuration for MinIO username, password, address, etc. |
| **mongodb.yml** | Configuration for MongoDB username, password, address, etc. |
| **discovery.yml** | Service discovery and etcd credentials and address. |
## Common Configuration Item Modifications
## OpenIMServer Related Configurations
| Configuration File | Description |
| ------------------------------- | ---------------------------------------------- |
| **log.yml** | Configuration for logging levels and storage directory |
| **notification.yml** | Event notification settings (e.g., add friend, create group) |
| **share.yml** | Common settings for all services (e.g., secrets) |
| **webhooks.yml** | Webhook URLs and related settings |
| **local-cache.yml** | Local cache settings (generally do not modify) |
| **openim-rpc-third.yml** | openim-rpc-third listen IP, port, and object storage settings |
| **openim-rpc-user.yml** | openim-rpc-user listen IP and port settings |
| **openim-api.yml** | openim-api listen IP, port, and other settings |
| **openim-crontask.yml** | openim-crontask scheduled task settings |
| **openim-msggateway.yml** | openim-msggateway listen IP, port, and other settings |
| **openim-msgtransfer.yml** | Settings for openim-msgtransfer service |
| **openim-push.yml** | openim-push listen IP, port, and offline push settings |
| **openim-rpc-auth.yml** | openim-rpc-auth listen IP, port, token validity settings |
| **openim-rpc-conversation.yml** | openim-rpc-conversation listen IP and port settings |
| **openim-rpc-friend.yml** | openim-rpc-friend listen IP and port settings |
| **openim-rpc-group.yml** | openim-rpc-group listen IP and port settings |
| **openim-rpc-msg.yml** | openim-rpc-msg listen IP and port settings |
| Configuration Item Modification | Configuration File |
| ----------------------------------------------------- | ----------------------- |
| Using MinIO for image and video file object storage | `minio.yml` |
| Adjusting production environment logs | `log.yml` |
| Verifying friendship before sending messages | `openim-rpc-msg.yml` |
| Modifying secret | `share.yml` |
| Using OSS, COS, AWS, Kodo for image and video storage | `openim-rpc-third.yml` |
| Setting multiple login policy | `openim-msggateway.yml` |
| Setting up offline push | `openim-push.yml` |
## Starting Multiple Instances of an OpenIM Service
## Monitoring and Alerting Related Configurations
| Configuration File | Description |
| ------------------------------ | --------------- |
| **prometheus.yml** | Prometheus configuration |
| **instance-down-rules.yml** | Alert rules |
| **alertmanager.yml** | Alertmanager configuration |
| **email.tmpl** | Email alert template |
| **grefana-template/Demo.json** | Default Grafana dashboard |
To start multiple instances of an OpenIM service, simply increase the corresponding port numbers and modify the `start-config.yml` file in the project root directory. Restart the service to take effect. For example, the configuration to start 2 instances of `openim-rpc-user` is as follows:
## Common Configuration Modifications
| Configuration Item | Configuration File |
| -------------------------------------------------------- | ----------------------- |
| Configure MinIO as object storage (focus on the externalAddress field) | `minio.yml` |
| Adjust log level and number of log files | `log.yml` |
| Enable or disable friend verification when sending messages | `openim-rpc-msg.yml` |
| OpenIMServer secret | `share.yml` |
| Configure OSS, COS, AWS, or Kodo as object storage | `openim-rpc-third.yml` |
| Multi-end mutual kick strategy and max concurrent connections per gateway | `openim-msggateway.yml` |
| Offline message push configuration | `openim-push.yml` |
| Configure webhooks for callback notifications (e.g., before/after message send) | `webhooks.yml` |
| Whether new group members can view historical messages | `openim-rpc-group.yml` |
| Token expiration time settings | `openim-rpc-auth.yml` |
| Scheduled task settings (e.g., how long to retain messages) | `openim-crontask.yml` |
## Starting Multiple Instances of a Service and Maximum File Descriptors
To start multiple instances of an OpenIM service, simply add the corresponding port numbers and modify the `start-config.yml` file in the projects root directory,
then restart the service. For example, to start 2 instances of `openim-rpc-user`:
```yaml
rpc:
@ -55,9 +73,15 @@ prometheus:
ports: [ 20100, 20101 ]
```
Modify `start-config.yml`:
Modify`start-config.yml`:
```yaml
serviceBinaries:
openim-rpc-user: 2
```
To set the maximum number of open file descriptors (typically one per online user):
```
maxFileDescriptors: 10000
```

View File

@ -1,45 +1,63 @@
# OpenIM配置文件说明以及常用配置修改说明
## 配置文件说明
## 外部组件相关配置
| Configuration File | Description |
| ------------------------------- | ------------------------------------------------------------ |
| **kafka.yml** | Kafka用户名、密码、地址等配置 |
| **redis.yml** | Redis密码、地址等配置 |
| **minio.yml** | MinIO用户名、密码、地址及外网IP域名等配置未修改外网IP或域名可能导致图片文件发送失败 |
| **zookeeper.yml** | ZooKeeper用户、密码、地址等配置 |
| **mongodb.yml** | MongoDB用户名、密码、地址等配置 |
| **log.yml** | 日志级别及存储目录等配置 |
| **notification.yml** | 添加好友、创建群组等事件通知配置 |
| **share.yml** | OpenIM各服务所需的公共配置如secret等 |
| **webhooks.yml** | Webhook中URL等配置 |
| **local-cache.yml** | 本地缓存配置 |
| **openim-rpc-third.yml** | openim-rpc-third服务的监听IP、端口及图片视频对象存储配置 |
| **openim-rpc-user.yml** | openim-rpc-user服务的监听IP、端口配置 |
| **openim-api.yml** | openim-api服务的监听IP、端口等配置项 |
| **openim-crontask.yml** | openim-crontask服务配置 |
| **openim-msggateway.yml** | openim-msggateway服务的监听IP、端口等配置 |
| **openim-msgtransfer.yml** | openim-msgtransfer服务配置 |
| **openim-push.yml** | openim-push服务的监听IP、端口及离线推送配置 |
| **openim-rpc-auth.yml** | openim-rpc-auth服务的监听IP、端口及token有效期等配置 |
| **openim-rpc-conversation.yml** | openim-rpc-conversation服务的监听IP、端口等配置 |
| **openim-rpc-friend.yml** | openim-rpc-friend服务的监听IP、端口等配置 |
| **openim-rpc-group.yml** | openim-rpc-group服务的监听IP、端口等配置 |
| **openim-rpc-msg.yml** | openim-rpc-msg服务的监听IP、端口及消息发送是否验证好友关系等配置 |
| Configuration File | Description |
| ------------------ | ---------------------------------- |
| **kafka.yml** | Kafka用户名、密码、地址等配置 |
| **redis.yml** | Redis密码、地址等配置 |
| **minio.yml** | MinIO用户名、密码、地址等配置 |
| **mongodb.yml** | MongoDB用户名、密码、地址等配置 |
| **discovery.yml** | 服务发现以及etcd用户名、密码、地址 |
## OpenIMServer相关配置
| Configuration File | Description |
| ------------------------------- | ---------------------------------------------- |
| **log.yml** | 日志级别及存储目录等配置 |
| **notification.yml** | 添加好友、创建群组等事件通知配置 |
| **share.yml** | 各服务所需的公共配置如secret等 |
| **webhooks.yml** | Webhook中URL等配置 |
| **local-cache.yml** | 本地缓存配置,一般不用修改 |
| **openim-rpc-third.yml** | openim-rpc-third监听IP、端口及对象存储配置 |
| **openim-rpc-user.yml** | openim-rpc-user监听IP、端口配置 |
| **openim-api.yml** | openim-api监听IP、端口等配置 |
| **openim-crontask.yml** | openim-crontask定时任务配置 |
| **openim-msggateway.yml** | openim-msggateway监听IP、端口等配置 |
| **openim-msgtransfer.yml** | openim-msgtransfer服务配置 |
| **openim-push.yml** | openim-push监听IP、端口及离线推送配置 |
| **openim-rpc-auth.yml** | openim-rpc-auth监听IP、端口及token有效期等配置 |
| **openim-rpc-conversation.yml** | openim-rpc-conversation监听IP、端口等配置 |
| **openim-rpc-friend.yml** | openim-rpc-friend监听IP、端口等配置 |
| **openim-rpc-group.yml** | openim-rpc-group监听IP、端口等配置 |
| **openim-rpc-msg.yml** | openim-rpc-msg服务的监听IP、端口等配置 |
## 监控告警相关配置
| Configuration File | Description |
| ------------------------------ | --------------- |
| **prometheus.yml** | prometheus配置 |
| **instance-down-rules.yml** | 告警规则 |
| **alertmanager.yml** | 告警管理配置 |
| **email.tmpl** | 邮件告警模版 |
| **grefana-template/Demo.json** | 默认的dashboard |
## 常用配置修改
| 修改配置项 | 配置文件 |
| -------------------------------------------------------- | ----------------------- |
| 使用minio作为对象存储时配置重点关注externalAddress字段 | `minio.yml` |
| 日志级别及日志文件数量调整 | `log.yml` |
| 发送消息是否需要验证好友关系 | `openim-rpc-msg.yml` |
| OpenIMServer秘钥 | `share.yml` |
| 使用oss, cos, aws, kodo作为对象存储时配置 | `openim-rpc-third.yml` |
| 多端互踢策略单个gateway同时最大连接数 | `openim-msggateway.yml` |
| 消息离线推送 | `openim-push.yml` |
| 配置webhook来通知回调服务器如消息发送前后回调 | `webhooks.yml` |
| 新入群用户是否可以查看历史消息 | `openim-rpc-group.yml` |
| token 过期时间设置 | `openim-rpc-auth.yml` |
| 定时任务设置,例如消息保存多长时间 | `openim-crontask.yml` |
| 修改配置项 | 配置文件 |
| ----------------------------------------------- | ----------------------- |
| 使用minio作为图片视频文件对象存储 | `minio.yml` |
| 生产环境日志调整 | `log.yml` |
| 发送消息是否验证好友关系 | `openim-rpc-msg.yml` |
| 修改secret | `share.yml` |
| 使用oss, cos, aws, kodo作为图片视频文件对象存储 | `openim-rpc-third.yml` |
| 设置多端互踢策略 | `openim-msggateway.yml` |
| 设置离线推送 | `openim-push.yml` |
## 启动某个服务的多个实例和最大文件句柄数
## 启动某个OpenIM服务的多个实例
若要启动某个OpenIM的多个实例只需增加对应的端口数并修改项目根目录下的`start-config.yml`文件重启服务即可生效。例如启动2个`openim-rpc-user`实例的配置如下:
@ -61,5 +79,8 @@ serviceBinaries:
openim-rpc-user: 2
```
修改最大同时打开的文件句柄数,一般是每个在线用户占用一个
```
maxFileDescriptors: 10000
```

View File

@ -1,7 +1,7 @@
# URI for database connection, leave empty if using address and credential settings directly
uri:
uri:
# List of MongoDB server addresses
address: [ localhost:37017 ]
address: [localhost:37017]
# Name of the database
database: openim_v3
# Username for database authentication
@ -14,3 +14,38 @@ authSource: openim_v3
maxPoolSize: 100
# Maximum number of retry attempts for a failed database connection
maxRetry: 10
# MongoDB Mode, including "standalone", "replicaSet"
mongoMode: "standalone"
# The following configurations only take effect when mongoMode is set to "replicaSet"
replicaSet:
name: rs0
hosts: [127.0.0.1:37017, 127.0.0.1:37018, 127.0.0.1:37019]
# Read concern level: "local", "available", "majority", "linearizable", "snapshot"
readConcern: majority
# maximum staleness of data in seconds
maxStaleness: 90s
# The following configurations only take effect when mongoMode is set to "replicaSet"
readPreference:
# Read preference mode, can be "primary", "primaryPreferred", "secondary", "secondaryPreferred", "nearest"
mode: primary
maxStaleness: 90s
# TagSets is an array of maps with priority based on order, empty map must be placed last for fallback tagSets
tagSets:
- datacenter: "cn-east"
rack: "1"
storage: "ssd"
- datacenter: "cn-east"
storage: "ssd"
- datacenter: "cn-east"
- {} # Empty map, indicates any node
# The following configurations only take effect when mongoMode is set to "replicaSet"
writeConcern:
# Write node count or tag (int, "majority", or custom tag)
w: majority
# Whether to wait for journal confirmation
j: true
# Write timeout duration
wtimeout: 30s

View File

@ -176,17 +176,38 @@ services:
environment:
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
TZ: Asia/Shanghai
# Unique identifier for the Kafka node (required in controller mode)
KAFKA_CFG_NODE_ID: 0
# Defines the roles this Kafka node plays: broker, controller, or both
KAFKA_CFG_PROCESS_ROLES: controller,broker
# Specifies which nodes are controller nodes for quorum voting.
# The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port
# The controller listener endpoint here is kafka:9093
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
# Specifies which listener is used for controller-to-controller communication
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
# Default number of partitions for new topics
KAFKA_NUM_PARTITIONS: 8
# Whether to enable automatic topic creation
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
# Kafka internal listeners; Kafka supports multiple ports with different protocols
# Each port is used for a specific purpose: INTERNAL for internal broker communication,
# CONTROLLER for controller communication, EXTERNAL for external client connections.
# These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
# In short, Kafka is listening on three logical ports: 9092 for internal communication,
# 9093 for controller traffic, and 9094 for external access.
KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
# Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka',
# so other containers can access Kafka via kafka:9092.
# EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect.
# If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP.
KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094"
# Maps logical listener names to actual protocols.
# Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"
# Defines which listener is used for inter-broker communication within the Kafka cluster
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
# Authentication configuration variables - comment out to disable auth
# KAFKA_USERNAME: "openIM"

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.12
github.com/openimsdk/tools v0.0.50-alpha.92
github.com/openimsdk/tools v0.0.50-alpha.97
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0

4
go.sum
View File

@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FA
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.92 h1:hWfykMhmi7EQEiwgQccJqbgggIuhun/PrVkBnjmj9Ec=
github.com/openimsdk/tools v0.0.50-alpha.92/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY=
github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config)
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config)
msgTransfer := &MsgTransfer{
historyConsumer: historyConsumer,
@ -161,8 +161,8 @@ func (m *MsgTransfer) Start(ctx context.Context) error {
}()
go func() {
fn := func(ctx context.Context, key string, value []byte) error {
m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value)
fn := func(msg mq.Message) error {
m.historyMongoHandler.HandleChatWs2Mongo(msg)
return nil
}
for {

View File

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/mq"
"sync"
"time"
@ -26,6 +27,8 @@ import (
"github.com/openimsdk/tools/discovery"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -37,7 +40,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
)
const (
@ -76,6 +78,7 @@ type ConsumerMessage struct {
Ctx context.Context
Key string
Value []byte
Raw mq.Message
}
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
@ -112,6 +115,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.
b.Do = och.do
och.redisMessageBatches = b
och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) {
lastMessage.Raw.Mark()
lastMessage.Raw.Commit()
}
return &och, nil
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) {
@ -134,53 +142,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
var conversationID string
var userSeqMap map[string]int64
// Outer map: conversationID -> (userID -> maxHasReadSeq)
conversationUserSeq := make(map[string]map[string]int64)
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
continue
}
//The conversation ID for each batch of messages processed by the batcher is the same.
conversationID = tips.ConversationID
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
continue
}
if userSeqMap == nil {
userSeqMap = make(map[string]int64)
}
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
continue
// Calculate the max seq from tips.Seqs
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
}
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
}
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
}
if userSeqMap == nil {
return
}
if len(conversationID) == 0 {
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
}
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
// persist to db
for convID, userSeqMap := range conversationUserSeq {
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
}
}
}
@ -392,10 +395,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte
return mcontext.SetOperationID(ctx, allMessageOperationID)
}
func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group
err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value})
func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group
err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg})
if err != nil {
log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value)
log.ZWarn(msg.Context(), "put msg to error", err, "key", msg.Key(), "value", msg.Value())
}
return nil
}

View File

@ -15,12 +15,12 @@
package msgtransfer
import (
"context"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"google.golang.org/protobuf/proto"
@ -40,7 +40,10 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabas
}
}
func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) {
func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) {
ctx := val.Context()
key := val.Key()
msg := val.Value()
msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
@ -58,6 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
prommetrics.MsgInsertMongoFailedCounter.Inc()
} else {
prommetrics.MsgInsertMongoSuccessCounter.Inc()
val.Mark()
}
for _, msgData := range msgFromMQ.MsgData {

View File

@ -2,6 +2,7 @@ package push
import (
"context"
"github.com/openimsdk/tools/mq"
"math/rand"
"strconv"
@ -106,8 +107,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
go func() {
pushHandler.WaitCache()
fn := func(ctx context.Context, key string, value []byte) error {
pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(ctx), value)
fn := func(msg mq.Message) error {
pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(msg.Context()), msg.Value())
return nil
}
consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32())))
@ -121,8 +122,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
}()
go func() {
fn := func(ctx context.Context, key string, value []byte) error {
offlineHandler.HandleMsg2OfflinePush(ctx, value)
fn := func(msg mq.Message) error {
offlineHandler.HandleMsg2OfflinePush(msg.Context(), msg.Value())
return nil
}
consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32())))

View File

@ -71,15 +71,39 @@ type Minio struct {
}
type Mongo struct {
URI string `yaml:"uri"`
Address []string `yaml:"address"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
AuthSource string `yaml:"authSource"`
MaxPoolSize int `yaml:"maxPoolSize"`
MaxRetry int `yaml:"maxRetry"`
URI string `yaml:"uri"`
Address []string `yaml:"address"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
AuthSource string `yaml:"authSource"`
MaxPoolSize int `yaml:"maxPoolSize"`
MaxRetry int `yaml:"maxRetry"`
MongoMode string `yaml:"mongoMode"`
ReplicaSet ReplicaSetConfig
ReadPreference ReadPrefConfig
WriteConcern WriteConcernConfig
}
type ReplicaSetConfig struct {
Name string `yaml:"name"`
Hosts []string `yaml:"hosts"`
ReadConcern string `yaml:"readConcern"`
MaxStaleness time.Duration `yaml:"maxStaleness"`
}
type ReadPrefConfig struct {
Mode string `yaml:"mode"`
TagSets []map[string]string `yaml:"tagSets"`
MaxStaleness time.Duration `yaml:"maxStaleness"`
}
type WriteConcernConfig struct {
W any `yaml:"w"`
J bool `yaml:"j"`
WTimeout time.Duration `yaml:"wtimeout"`
}
type Kafka struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
@ -493,6 +517,23 @@ func (m *Mongo) Build() *mongoutil.Config {
AuthSource: m.AuthSource,
MaxPoolSize: m.MaxPoolSize,
MaxRetry: m.MaxRetry,
MongoMode: m.MongoMode,
ReplicaSet: &mongoutil.ReplicaSetConfig{
Name: m.ReplicaSet.Name,
Hosts: m.ReplicaSet.Hosts,
ReadConcern: m.ReplicaSet.ReadConcern,
MaxStaleness: m.ReplicaSet.MaxStaleness,
},
ReadPreference: &mongoutil.ReadPrefConfig{
Mode: m.ReadPreference.Mode,
TagSets: m.ReadPreference.TagSets,
MaxStaleness: m.ReadPreference.MaxStaleness,
},
WriteConcern: &mongoutil.WriteConcernConfig{
W: m.WriteConcern.W,
J: m.WriteConcern.J,
WTimeout: m.WriteConcern.WTimeout,
},
}
}

View File

@ -76,6 +76,27 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCac
if err != nil {
return nil, err
}
values, err := fn(ctx, ids)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "ids", ids)
return nil, err
}
idToValue := make(map[K]*V)
for _, value := range values {
idToValue[vId(value)] = value
}
getSlotValues := func(slotIds []K) []*V {
if len(slotIds) == 0 {
return nil
}
slotValues := make([]*V, 0, len(slotIds))
for _, id := range slotIds {
if value, ok := idToValue[id]; ok {
slotValues = append(slotValues, value)
}
}
return slotValues
}
result := make([]*V, 0, len(findKeys))
for _, keys := range slotKeys {
indexCache, err := rcClient.GetClient().FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
@ -86,16 +107,12 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCac
idIndex[id] = index
queryIds = append(queryIds, id)
}
values, err := fn(ctx, queryIds)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "keys", keys, "queryIds", queryIds)
return nil, err
}
if len(values) == 0 {
slotValues := getSlotValues(queryIds)
if len(slotValues) == 0 {
return map[int]string{}, nil
}
cacheIndex := make(map[int]string)
for _, value := range values {
for _, value := range slotValues {
id := vId(value)
index, ok := idIndex[id]
if !ok {

View File

@ -321,7 +321,12 @@ func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID prim
}
func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []searchMessageIndex, error) {
filter := bson.M{}
filter := bson.M{
"msgs.msg": bson.M{
"$exists": true,
"$type": "object",
},
}
if req.RecvID != "" {
filter["$or"] = bson.A{
bson.M{"msgs.msg.recv_id": req.RecvID},

View File

@ -9,6 +9,8 @@ import (
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -148,3 +150,29 @@ func TestName5(t *testing.T) {
// }
// t.Log(seq, sendTime)
//}
func TestSearchMessage(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
defer cancel()
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
msgMongo, err := NewMsgMongo(cli.Database("openim_v3"))
if err != nil {
panic(err)
}
ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli()
t.Log(ts)
req := &msg.SearchMessageReq{
//SendID: "yjz",
//RecvID: "aibot",
Pagination: &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 20,
},
}
count, resp, err := msgMongo.SearchMessage(ctx, req)
if err != nil {
panic(err)
}
t.Log(resp, count)
}