mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-27 22:12:15 +08:00
Merge branch 'openimsdk:main' into main
This commit is contained in:
commit
6b119bd9d9
2
.env
2
.env
@ -4,7 +4,7 @@ REDIS_IMAGE=redis:7.0.0
|
||||
ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8
|
||||
KAFKA_IMAGE=bitnami/kafka:3.5.1
|
||||
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z
|
||||
|
||||
ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13
|
||||
|
||||
OPENIM_WEB_FRONT_IMAGE=openim/openim-web-front:release-v3.5.1
|
||||
OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.7
|
||||
|
||||
59
.github/dependabot.yml
vendored
59
.github/dependabot.yml
vendored
@ -1,59 +0,0 @@
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "gomod"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
time: "08:00"
|
||||
labels:
|
||||
- "dependencies"
|
||||
commit-message:
|
||||
prefix: "feat"
|
||||
include: "scope"
|
||||
groups:
|
||||
gomod-deps:
|
||||
patterns:
|
||||
- "*"
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
time: "08:00"
|
||||
labels:
|
||||
- "dependencies"
|
||||
commit-message:
|
||||
prefix: "chore"
|
||||
include: "scope"
|
||||
groups:
|
||||
github-actions:
|
||||
patterns:
|
||||
- "*"
|
||||
- package-ecosystem: "docker"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
time: "08:00"
|
||||
labels:
|
||||
- "dependencies"
|
||||
commit-message:
|
||||
prefix: "feat"
|
||||
include: "scope"
|
||||
@ -1,7 +1,9 @@
|
||||
# 如何给OpenIM贡献代码(提交pull request)
|
||||
|
||||
|
||||
# 如何给 OpenIM 贡献代码(提交 Pull Request)
|
||||
|
||||
<p align="center">
|
||||
<a href="./CONTRIBUTING.md">Englist</a> ·
|
||||
<a href="./CONTRIBUTING.md">English</a> ·
|
||||
<a href="./CONTRIBUTING-zh_CN.md">中文</a> ·
|
||||
<a href="docs/contributing/CONTRIBUTING-UA.md">Українська</a> ·
|
||||
<a href="docs/contributing/CONTRIBUTING-CS.md">Česky</a> ·
|
||||
@ -28,55 +30,67 @@
|
||||
<a href="docs/contributing/CONTRIBUTING-TR.md">Türkçe</a>
|
||||
</p>
|
||||
|
||||
本指南将以 [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) 为例,详细说明如何为 OpenIM 项目贡献代码。我们采用“一问题一分支”的策略,确保每个 Issue 都对应一个专门的分支,以便有效管理代码变更。
|
||||
|
||||
本指南将以 [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server)为例详细说明如何为 OpenIM 项目贡献代码。我们采用“一问题一分支”的策略,确保每个 Issue 都对应一个专门的分支,以便有效管理代码变更。
|
||||
|
||||
## 1. Fork 仓库
|
||||
### 1. Fork 仓库
|
||||
前往 [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) GitHub 页面,点击右上角的 "Fork" 按钮,将仓库 Fork 到你的 GitHub 账户下。
|
||||
|
||||
## 2. 克隆仓库
|
||||
### 2. 克隆仓库
|
||||
将你 Fork 的仓库克隆到本地:
|
||||
```bash
|
||||
git clone https://github.com/your-username/open-im-server.git
|
||||
```
|
||||
|
||||
## 3. 设置远程上游
|
||||
### 3. 设置远程上游
|
||||
添加原始仓库为远程上游以便跟踪其更新:
|
||||
```bash
|
||||
git remote add upstream https://github.com/openimsdk/open-im-server.git
|
||||
```
|
||||
|
||||
## 4. 创建 Issue
|
||||
在原始仓库中创建一个新的 Issue,详细描述你遇到的问题或希望添加的新功能。
|
||||
### 4. 创建 Issue
|
||||
在原始仓库中创建一个新的 Issue,详细描述你遇到的问题或希望添加
|
||||
|
||||
## 5. 创建新分支
|
||||
的新功能。
|
||||
|
||||
### 5. 创建新分支
|
||||
基于主分支创建一个新分支,并使用描述性的名称与 Issue ID,例如:
|
||||
```bash
|
||||
git checkout -b fix-bug-123
|
||||
```
|
||||
|
||||
## 6. 提交更改
|
||||
### 6. 提交更改
|
||||
在你的本地分支上进行更改后,提交这些更改:
|
||||
```bash
|
||||
git add .
|
||||
git commit -m "Describe your changes in detail"
|
||||
```
|
||||
|
||||
## 7. 推送分支
|
||||
### 7. 推送分支
|
||||
将你的分支推送回你的 GitHub Fork:
|
||||
```bash
|
||||
git push origin fix-bug-123
|
||||
```
|
||||
|
||||
## 8. 创建 Pull Request
|
||||
### 8. 创建 Pull Request
|
||||
在 GitHub 上转到你的 Fork 仓库,点击 "Pull Request" 按钮。确保 PR 描述清楚,并链接到相关的 Issue。
|
||||
|
||||
## 9. 签署 CLA
|
||||
### 9. 签署 CLA
|
||||
如果这是你第一次提交 PR,你需要在 PR 的评论中回复:
|
||||
```
|
||||
I have read the CLA Document and I hereby sign the CLA
|
||||
```
|
||||
## 其他说明
|
||||
|
||||
如果需要将同一修改提交到两个不同的分支(例如 `main` 和 `release-v3.7`),应从对应的远程分支分别创建两个新分支。首先在一个分支上完成修改,然后使用 `cherry-pick` 命令将这些更改应用到另一个分支。之后,为每个分支独立提交 Pull Request。
|
||||
### 编程规范
|
||||
请参考以下文档以了解关于 Go 语言编程规范的详细信息:
|
||||
- [Go 编码规范](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/go-code.md)
|
||||
- [代码约定](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/code-conventions.md)
|
||||
|
||||
### 日志规范
|
||||
- **禁止使用标准的 `log` 包**。
|
||||
- 应使用 `"github.com/openimsdk/tools/log"` 包来打印日志,该包支持多种日志级别:`debug`、`info`、`warn`、`error`。
|
||||
- **错误日志应仅在首次调用的函数中打印**,以防止日志重复,并确保错误的上下文清晰。
|
||||
|
||||
### 异常及错误处理
|
||||
- **禁止使用 `panic`**:程序中不应使用 `panic`,以避免在遇到不可恢复的错误时突然终止。
|
||||
- **错误包裹**:使用 `"github.com/openimsdk/tools/errs"` 来包裹错误,保持错误信息的完整性并增加调试便利。
|
||||
- **错误传递**:如果函数本身不能处理错误,应将错误返回给调用者,而不是隐藏或忽略这些错误。
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
# How do I contribute code to OpenIM
|
||||
# How to Contribute to OpenIM (Submitting Pull Requests)
|
||||
|
||||
<p align="center">
|
||||
<a href="./CONTRIBUTING.md">Englist</a> ·
|
||||
<a href="./CONTRIBUTING.md">English</a> ·
|
||||
<a href="./CONTRIBUTING-zh_CN.md">中文</a> ·
|
||||
<a href="docs/contributing/CONTRIBUTING-UA.md">Українська</a> ·
|
||||
<a href="docs/contributing/CONTRIBUTING-CS.md">Česky</a> ·
|
||||
@ -27,13 +27,14 @@
|
||||
<a href="docs/contributing/CONTRIBUTING-GR.md">Ελληνικά</a> ·
|
||||
<a href="docs/contributing/CONTRIBUTING-TR.md">Türkçe</a>
|
||||
</p>
|
||||
This guide will explain in detail how to contribute code to the OpenIM project, using `openimsdk/open-im-server` as an example. We adopt a "one issue, one branch" strategy to ensure each issue corresponds to a dedicated branch, allowing for effective management of code changes.
|
||||
|
||||
This guide will use [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) as an example to explain in detail how to contribute code to the OpenIM project. We adopt a "one issue, one branch" strategy to ensure each issue corresponds to a dedicated branch for effective code change management.
|
||||
|
||||
### 1. Fork the Repository
|
||||
Go to the `openimsdk/open-im-server` GitHub page, click the "Fork" button in the upper right corner to fork the repository to your GitHub account.
|
||||
Go to the [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) GitHub page, click the "Fork" button in the upper right corner to fork the repository to your GitHub account.
|
||||
|
||||
### 2. Clone the Repository
|
||||
Clone the forked repository to your local machine:
|
||||
Clone the repository you forked to your local machine:
|
||||
```bash
|
||||
git clone https://github.com/your-username/open-im-server.git
|
||||
```
|
||||
@ -45,19 +46,21 @@ git remote add upstream https://github.com/openimsdk/open-im-server.git
|
||||
```
|
||||
|
||||
### 4. Create an Issue
|
||||
Create a new issue in the original repository describing the problem you are facing or the new feature you want to add. For significant feature adjustments, propose an RFC issue to facilitate broad discussion and participation from community members.
|
||||
Create a new issue in the original repository detailing the problem you encountered or the new feature you wish to add.
|
||||
|
||||
### 5. Create a New Branch
|
||||
Create a new branch based on the main branch and name it descriptively, including the Issue ID, for example:
|
||||
Create a new branch off the main branch with a descriptive name and Issue ID, for example:
|
||||
```bash
|
||||
git checkout -b fix-bug-123
|
||||
```
|
||||
|
||||
### 6. Commit Changes
|
||||
After making changes on your local branch, commit them:
|
||||
After making changes on your local branch, commit these changes:
|
||||
```bash
|
||||
git add .
|
||||
git commit -m "Describe your changes in detail"
|
||||
git commit -m "Describe your changes
|
||||
|
||||
in detail"
|
||||
```
|
||||
|
||||
### 7. Push the Branch
|
||||
@ -67,15 +70,25 @@ git push origin fix-bug-123
|
||||
```
|
||||
|
||||
### 8. Create a Pull Request
|
||||
Go to your fork on GitHub, click the "Pull Request" button. Make sure the PR description is clear and links to the related Issue.
|
||||
#### 🅰 Fixed issue #issueID
|
||||
Go to your fork on GitHub and click the "Pull Request" button. Ensure the PR description is clear and links to the related issue.
|
||||
|
||||
### 9. Sign the CLA
|
||||
If this is your first time submitting a PR, you need to reply in the PR comments:
|
||||
If this is your first time submitting a PR, you will need to reply in the comments of the PR:
|
||||
```
|
||||
I have read the CLA Document and I hereby sign the CLA
|
||||
```
|
||||
|
||||
### Additional Notes
|
||||
If the same modification needs to be submitted to two different branches (e.g., main and release-v3.7), create two new branches from the corresponding remote branches. First complete the modification in one branch, then use the `cherry-pick` command to apply these changes to the other branch. After that, submit a separate Pull Request for each branch.
|
||||
### Programming Standards
|
||||
Please refer to the following documents for detailed information on Go language programming standards:
|
||||
- [Go Coding Standards](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/go-code.md)
|
||||
- [Code Conventions](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/code-conventions.md)
|
||||
|
||||
### Logging Standards
|
||||
- **Do not use the standard `log` package**.
|
||||
- Use the `"github.com/openimsdk/tools/log"` package for logging, which supports multiple log levels: `debug`, `info`, `warn`, `error`.
|
||||
- **Error logs should only be printed in the function where they are first actively called** to prevent log duplication and ensure clear error context.
|
||||
|
||||
### Exception and Error Handling
|
||||
- **Prohibit the use of `panic`**: The code should not use `panic` to avoid abrupt termination when encountering unrecoverable errors.
|
||||
- **Error Wrapping**: Use `"github.com/openimsdk/tools/errs"` to wrap errors, maintaining the integrity of error information and facilitating debugging.
|
||||
- **Error Propagation**: If a function cannot handle an error itself, it should return the error to the caller, rather than hiding or ignoring it.
|
||||
|
||||
13
config/discovery.yml
Normal file
13
config/discovery.yml
Normal file
@ -0,0 +1,13 @@
|
||||
enable: "etcd"
|
||||
etcd:
|
||||
rootDirectory: openim
|
||||
address: [ localhost:12379 ]
|
||||
username: ''
|
||||
password: ''
|
||||
|
||||
zookeeper:
|
||||
schema: openim
|
||||
address: [ localhost:12181 ]
|
||||
username: ''
|
||||
password: ''
|
||||
|
||||
@ -29,19 +29,4 @@ object:
|
||||
accessKeyID: ''
|
||||
accessKeySecret: ''
|
||||
sessionToken: ''
|
||||
publicRead: false
|
||||
kodo:
|
||||
endpoint: "webhook://s3.cn-east-1.qiniucs.com"
|
||||
bucket: "demo-9999999"
|
||||
bucketURL: "webhook://your.domain.com"
|
||||
accessKeyID: ''
|
||||
accessKeySecret: ''
|
||||
sessionToken: ''
|
||||
publicRead: false
|
||||
aws:
|
||||
endpoint: "''"
|
||||
region: "us-east-1"
|
||||
bucket: "demo-9999999"
|
||||
accessKeyID: ''
|
||||
accessKeySecret: ''
|
||||
publicRead: false
|
||||
publicRead: false
|
||||
@ -1,5 +1,4 @@
|
||||
secret: openIM123
|
||||
env: zookeeper
|
||||
rpcRegisterName:
|
||||
user: user
|
||||
friend: friend
|
||||
|
||||
@ -1,6 +0,0 @@
|
||||
|
||||
schema: openim
|
||||
address: [ localhost:12181 ]
|
||||
username: ''
|
||||
password: ''
|
||||
|
||||
@ -58,6 +58,26 @@ services:
|
||||
networks:
|
||||
- openim
|
||||
|
||||
etcd:
|
||||
image: "${ETCD_IMAGE}"
|
||||
container_name: etcd
|
||||
ports:
|
||||
- "12379:2379"
|
||||
- "12380:2380"
|
||||
environment:
|
||||
- ETCD_NAME=s1
|
||||
- ETCD_DATA_DIR=/etcd-data
|
||||
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
|
||||
- ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379
|
||||
- ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
|
||||
- ETCD_INITIAL_ADVERTISE_PEER_URLS=http://0.0.0.0:2380
|
||||
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
|
||||
- ETCD_INITIAL_CLUSTER_TOKEN=tkn
|
||||
- ETCD_INITIAL_CLUSTER_STATE=new
|
||||
restart: always
|
||||
networks:
|
||||
- openim
|
||||
|
||||
kafka:
|
||||
image: "${KAFKA_IMAGE}"
|
||||
container_name: kafka
|
||||
|
||||
10
go.mod
10
go.mod
@ -14,7 +14,7 @@ require (
|
||||
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.65
|
||||
github.com/openimsdk/tools v0.0.49-alpha.2
|
||||
github.com/openimsdk/tools v0.0.49-alpha.19
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
@ -44,7 +44,6 @@ require (
|
||||
golang.org/x/sync v0.6.0
|
||||
)
|
||||
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.112.0 // indirect
|
||||
cloud.google.com/go/compute v1.23.3 // indirect
|
||||
@ -59,6 +58,8 @@ require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
|
||||
github.com/clbanning/mxj v1.8.4 // indirect
|
||||
github.com/coreos/go-semver v0.3.0 // indirect
|
||||
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
@ -75,7 +76,7 @@ require (
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-zookeeper/zk v1.0.3 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
github.com/google/s2a-go v0.1.7 // indirect
|
||||
@ -138,6 +139,9 @@ require (
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.13 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
|
||||
|
||||
19
go.sum
19
go.sum
@ -47,6 +47,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
|
||||
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
|
||||
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
|
||||
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
|
||||
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@ -111,6 +115,7 @@ github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg
|
||||
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
|
||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
|
||||
@ -132,8 +137,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
|
||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
@ -283,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
|
||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
|
||||
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws=
|
||||
github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko=
|
||||
github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs=
|
||||
github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
@ -378,6 +383,12 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
|
||||
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4=
|
||||
go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8=
|
||||
go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js=
|
||||
go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI=
|
||||
go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80=
|
||||
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
|
||||
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
|
||||
|
||||
@ -38,20 +38,17 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
RpcConfig config.API
|
||||
MongodbConfig config.Mongo
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
MinioConfig config.Minio
|
||||
API config.API
|
||||
Share config.Share
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, index int, config *Config) error {
|
||||
apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index)
|
||||
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index)
|
||||
prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -59,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error {
|
||||
var client discovery.SvcDiscoveryRegistry
|
||||
|
||||
// Determine whether zk is passed according to whether it is a clustered deployment
|
||||
client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
|
||||
client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
|
||||
if err != nil {
|
||||
return errs.WrapMsg(err, "failed to register discovery service")
|
||||
}
|
||||
@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error {
|
||||
)
|
||||
|
||||
router := newGinRouter(client, config)
|
||||
if config.RpcConfig.Prometheus.Enable {
|
||||
if config.API.Prometheus.Enable {
|
||||
go func() {
|
||||
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
|
||||
p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
|
||||
@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error {
|
||||
}()
|
||||
|
||||
}
|
||||
address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort))
|
||||
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
|
||||
|
||||
server := http.Server{Addr: address, Handler: router}
|
||||
log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine {
|
||||
@ -25,7 +26,6 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
|
||||
_ = v.RegisterValidation("required_if", RequiredIf)
|
||||
}
|
||||
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
|
||||
// init rpc client here
|
||||
userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway,
|
||||
config.Share.IMAdminUserID)
|
||||
@ -34,39 +34,39 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg)
|
||||
conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation)
|
||||
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
|
||||
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL)
|
||||
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL)
|
||||
|
||||
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
|
||||
u := NewUserApi(*userRpc)
|
||||
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
|
||||
ParseToken := GinParseToken(authRpc)
|
||||
userRouterGroup := r.Group("/user")
|
||||
{
|
||||
userRouterGroup.POST("/user_register", u.UserRegister)
|
||||
userRouterGroup.POST("/update_user_info", ParseToken, u.UpdateUserInfo)
|
||||
userRouterGroup.POST("/update_user_info_ex", ParseToken, u.UpdateUserInfoEx)
|
||||
userRouterGroup.POST("/set_global_msg_recv_opt", ParseToken, u.SetGlobalRecvMessageOpt)
|
||||
userRouterGroup.POST("/get_users_info", ParseToken, u.GetUsersPublicInfo)
|
||||
userRouterGroup.POST("/get_all_users_uid", ParseToken, u.GetAllUsersID)
|
||||
userRouterGroup.POST("/account_check", ParseToken, u.AccountCheck)
|
||||
userRouterGroup.POST("/get_users", ParseToken, u.GetUsers)
|
||||
userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus)
|
||||
userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail)
|
||||
userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus)
|
||||
userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus)
|
||||
userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus)
|
||||
userRouterGroup.POST("/update_user_info", u.UpdateUserInfo)
|
||||
userRouterGroup.POST("/update_user_info_ex", u.UpdateUserInfoEx)
|
||||
userRouterGroup.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt)
|
||||
userRouterGroup.POST("/get_users_info", u.GetUsersPublicInfo)
|
||||
userRouterGroup.POST("/get_all_users_uid", u.GetAllUsersID)
|
||||
userRouterGroup.POST("/account_check", u.AccountCheck)
|
||||
userRouterGroup.POST("/get_users", u.GetUsers)
|
||||
userRouterGroup.POST("/get_users_online_status", u.GetUsersOnlineStatus)
|
||||
userRouterGroup.POST("/get_users_online_token_detail", u.GetUsersOnlineTokenDetail)
|
||||
userRouterGroup.POST("/subscribe_users_status", u.SubscriberStatus)
|
||||
userRouterGroup.POST("/get_users_status", u.GetUserStatus)
|
||||
userRouterGroup.POST("/get_subscribe_users_status", u.GetSubscribeUsersStatus)
|
||||
|
||||
userRouterGroup.POST("/process_user_command_add", ParseToken, u.ProcessUserCommandAdd)
|
||||
userRouterGroup.POST("/process_user_command_delete", ParseToken, u.ProcessUserCommandDelete)
|
||||
userRouterGroup.POST("/process_user_command_update", ParseToken, u.ProcessUserCommandUpdate)
|
||||
userRouterGroup.POST("/process_user_command_get", ParseToken, u.ProcessUserCommandGet)
|
||||
userRouterGroup.POST("/process_user_command_get_all", ParseToken, u.ProcessUserCommandGetAll)
|
||||
userRouterGroup.POST("/process_user_command_add", u.ProcessUserCommandAdd)
|
||||
userRouterGroup.POST("/process_user_command_delete", u.ProcessUserCommandDelete)
|
||||
userRouterGroup.POST("/process_user_command_update", u.ProcessUserCommandUpdate)
|
||||
userRouterGroup.POST("/process_user_command_get", u.ProcessUserCommandGet)
|
||||
userRouterGroup.POST("/process_user_command_get_all", u.ProcessUserCommandGetAll)
|
||||
|
||||
userRouterGroup.POST("/add_notification_account", ParseToken, u.AddNotificationAccount)
|
||||
userRouterGroup.POST("/update_notification_account", ParseToken, u.UpdateNotificationAccountInfo)
|
||||
userRouterGroup.POST("/search_notification_account", ParseToken, u.SearchNotificationAccount)
|
||||
userRouterGroup.POST("/add_notification_account", u.AddNotificationAccount)
|
||||
userRouterGroup.POST("/update_notification_account", u.UpdateNotificationAccountInfo)
|
||||
userRouterGroup.POST("/search_notification_account", u.SearchNotificationAccount)
|
||||
}
|
||||
// friend routing group
|
||||
friendRouterGroup := r.Group("/friend", ParseToken)
|
||||
friendRouterGroup := r.Group("/friend")
|
||||
{
|
||||
f := NewFriendApi(*friendRpc)
|
||||
friendRouterGroup.POST("/delete_friend", f.DeleteFriend)
|
||||
@ -88,7 +88,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
friendRouterGroup.POST("/update_friends", f.UpdateFriends)
|
||||
}
|
||||
g := NewGroupApi(*groupRpc)
|
||||
groupRouterGroup := r.Group("/group", ParseToken)
|
||||
groupRouterGroup := r.Group("/group")
|
||||
{
|
||||
groupRouterGroup.POST("/create_group", g.CreateGroup)
|
||||
groupRouterGroup.POST("/set_group_info", g.SetGroupInfo)
|
||||
@ -120,12 +120,12 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
{
|
||||
a := NewAuthApi(*authRpc)
|
||||
authRouterGroup.POST("/user_token", a.UserToken)
|
||||
authRouterGroup.POST("/get_user_token", ParseToken, a.GetUserToken)
|
||||
authRouterGroup.POST("/get_user_token", a.GetUserToken)
|
||||
authRouterGroup.POST("/parse_token", a.ParseToken)
|
||||
authRouterGroup.POST("/force_logout", ParseToken, a.ForceLogout)
|
||||
authRouterGroup.POST("/force_logout", a.ForceLogout)
|
||||
}
|
||||
// Third service
|
||||
thirdGroup := r.Group("/third", ParseToken)
|
||||
thirdGroup := r.Group("/third")
|
||||
{
|
||||
t := NewThirdApi(*thirdRpc)
|
||||
thirdGroup.GET("/prometheus", t.GetPrometheus)
|
||||
@ -137,7 +137,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
logs.POST("/delete", t.DeleteLogs)
|
||||
logs.POST("/search", t.SearchLogs)
|
||||
|
||||
objectGroup := r.Group("/object", ParseToken)
|
||||
objectGroup := r.Group("/object")
|
||||
|
||||
objectGroup.POST("/part_limit", t.PartLimit)
|
||||
objectGroup.POST("/part_size", t.PartSize)
|
||||
@ -150,7 +150,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
objectGroup.GET("/*name", t.ObjectRedirect)
|
||||
}
|
||||
// Message
|
||||
msgGroup := r.Group("/msg", ParseToken)
|
||||
msgGroup := r.Group("/msg")
|
||||
{
|
||||
msgGroup.POST("/newest_seq", m.GetSeq)
|
||||
msgGroup.POST("/search_msg", m.SearchMsg)
|
||||
@ -174,7 +174,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
msgGroup.POST("/get_server_time", m.GetServerTime)
|
||||
}
|
||||
// Conversation
|
||||
conversationGroup := r.Group("/conversation", ParseToken)
|
||||
conversationGroup := r.Group("/conversation")
|
||||
{
|
||||
c := NewConversationApi(*conversationRpc)
|
||||
conversationGroup.POST("/get_sorted_conversation_list", c.GetSortedConversationList)
|
||||
@ -185,7 +185,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
|
||||
}
|
||||
|
||||
statisticsGroup := r.Group("/statistics", ParseToken)
|
||||
statisticsGroup := r.Group("/statistics")
|
||||
{
|
||||
statisticsGroup.POST("/user/register", u.UserRegisterCount)
|
||||
statisticsGroup.POST("/user/active", m.GetActiveUser)
|
||||
@ -199,6 +199,13 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
switch c.Request.Method {
|
||||
case http.MethodPost:
|
||||
for _, wApi := range Whitelist {
|
||||
if strings.HasPrefix(c.Request.URL.Path, wApi) {
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
token := c.Request.Header.Get(constant.Token)
|
||||
if token == "" {
|
||||
log.ZWarn(c, "header get token error", servererrs.ErrArgs.WrapMsg("header must have token"))
|
||||
@ -218,3 +225,10 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Whitelist api not parse token
|
||||
var Whitelist = []string{
|
||||
"/user/user_register",
|
||||
"/auth/user_token",
|
||||
"/auth/parse_token",
|
||||
}
|
||||
|
||||
@ -286,6 +286,7 @@ func (c *Client) KickOnlineMessage() error {
|
||||
resp := Resp{
|
||||
ReqIdentifier: WSKickOnlineMsg,
|
||||
}
|
||||
log.ZDebug(c.ctx, "KickOnlineMessage debug ")
|
||||
err := c.writeBinaryMsg(resp)
|
||||
c.close()
|
||||
return err
|
||||
|
||||
@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover
|
||||
}
|
||||
|
||||
func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
|
||||
return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
|
||||
return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
|
||||
conf.MsgGateway.RPC.RegisterIP,
|
||||
conf.MsgGateway.RPC.Ports, index,
|
||||
conf.Share.RpcRegisterName.MessageGateway,
|
||||
|
||||
@ -24,10 +24,10 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
MsgGateway config.MsgGateway
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
MsgGateway config.MsgGateway
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
// Start run ws server.
|
||||
|
||||
@ -211,7 +211,8 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
|
||||
|
||||
// Online push user online message to other node
|
||||
for _, v := range conns {
|
||||
v := v // safe closure var
|
||||
v := v
|
||||
log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target())
|
||||
if v.Target() == ws.disCov.GetSelfConnTarget() {
|
||||
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
|
||||
continue
|
||||
@ -267,7 +268,9 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
if ws.msgGatewayConfig.Share.Env == "zookeeper" {
|
||||
log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", ws.msgGatewayConfig.Discovery.Enable)
|
||||
|
||||
if ws.msgGatewayConfig.Discovery.Enable != "k8s" {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
@ -56,13 +56,13 @@ type MsgTransfer struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
MsgTransfer config.MsgTransfer
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
KafkaConfig config.Kafka
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
MsgTransfer config.MsgTransfer
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
KafkaConfig config.Kafka
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, index int, config *Config) error {
|
||||
@ -76,7 +76,7 @@ func Start(ctx context.Context, index int, config *Config) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
|
||||
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -83,7 +83,7 @@ type OnlineHistoryRedisConsumerHandler struct {
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic})
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct {
|
||||
}
|
||||
|
||||
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic})
|
||||
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -12,11 +12,6 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
KUBERNETES = "k8s"
|
||||
ZOOKEEPER = "zookeeper"
|
||||
)
|
||||
|
||||
type OnlinePusher interface {
|
||||
GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error)
|
||||
@ -42,10 +37,12 @@ func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
|
||||
}
|
||||
|
||||
func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
|
||||
switch config.Share.Env {
|
||||
case KUBERNETES:
|
||||
switch config.Discovery.Enable {
|
||||
case "k8s":
|
||||
return NewK8sStaticConsistentHash(disCov, config)
|
||||
case ZOOKEEPER:
|
||||
case "zookeeper":
|
||||
return NewDefaultAllNode(disCov, config)
|
||||
case "etcd":
|
||||
return NewDefaultAllNode(disCov, config)
|
||||
default:
|
||||
return newEmptyOnlinePUsher()
|
||||
@ -64,7 +61,12 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D
|
||||
func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway)
|
||||
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
|
||||
if len(conns) == 0 {
|
||||
log.ZWarn(ctx, "get gateway conn 0 ", nil)
|
||||
} else {
|
||||
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -85,10 +87,12 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M
|
||||
// Online push message
|
||||
for _, conn := range conns {
|
||||
conn := conn // loop var safe
|
||||
ctx := ctx
|
||||
wg.Go(func() error {
|
||||
msgClient := msggateway.NewMsgGatewayClient(conn)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ", err, "req:", input.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -24,11 +24,11 @@ type Config struct {
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
KafkaConfig config.Kafka
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
|
||||
|
||||
@ -60,7 +60,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
|
||||
var consumerHandler ConsumerHandler
|
||||
var err error
|
||||
consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID,
|
||||
[]string{config.KafkaConfig.ToPushTopic})
|
||||
[]string{config.KafkaConfig.ToPushTopic}, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -45,10 +45,10 @@ type authServer struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
RpcConfig config.Auth
|
||||
RedisConfig config.Redis
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
Share config.Share
|
||||
RpcConfig config.Auth
|
||||
RedisConfig config.Redis
|
||||
Share config.Share
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
|
||||
@ -51,10 +51,10 @@ type Config struct {
|
||||
RpcConfig config.Conversation
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
|
||||
@ -50,14 +50,15 @@ type friendServer struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
RpcConfig config.Friend
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
RpcConfig config.Friend
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
//ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
|
||||
@ -68,11 +68,11 @@ type Config struct {
|
||||
RpcConfig config.Group
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
|
||||
@ -59,11 +59,11 @@ type (
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
KafkaConfig config.Kafka
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -46,11 +46,11 @@ type Config struct {
|
||||
RpcConfig config.Third
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
MinioConfig config.Minio
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
|
||||
@ -61,11 +61,11 @@ type Config struct {
|
||||
RedisConfig config.Redis
|
||||
MongodbConfig config.Mongo
|
||||
KafkaConfig config.Kafka
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
|
||||
@ -33,9 +33,9 @@ import (
|
||||
)
|
||||
|
||||
type CronTaskConfig struct {
|
||||
CronTask config.CronTask
|
||||
ZookeeperConfig config.ZooKeeper
|
||||
Share config.Share
|
||||
CronTask config.CronTask
|
||||
Share config.Share
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *CronTaskConfig) error {
|
||||
@ -43,7 +43,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
|
||||
if config.CronTask.RetainChatRecords < 1 {
|
||||
return errs.New("msg destruct time must be greater than 1").Wrap()
|
||||
}
|
||||
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
|
||||
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
|
||||
if err != nil {
|
||||
return errs.WrapMsg(err, "failed to register discovery service")
|
||||
}
|
||||
|
||||
@ -33,9 +33,9 @@ func NewApiCmd() *ApiCmd {
|
||||
var apiConfig api.Config
|
||||
ret := &ApiCmd{apiConfig: &apiConfig}
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMAPICfgFileName: &apiConfig.RpcConfig,
|
||||
ZookeeperConfigFileName: &apiConfig.ZookeeperConfig,
|
||||
OpenIMAPICfgFileName: &apiConfig.API,
|
||||
ShareFileName: &apiConfig.Share,
|
||||
DiscoveryConfigFilename: &apiConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
|
||||
@ -36,8 +36,8 @@ func NewAuthRpcCmd() *AuthRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
|
||||
RedisConfigFileName: &authConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &authConfig.ZookeeperConfig,
|
||||
ShareFileName: &authConfig.Share,
|
||||
DiscoveryConfigFilename: &authConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -53,7 +53,7 @@ func (a *AuthRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *AuthRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
|
||||
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start)
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ var (
|
||||
LocalCacheConfigFileName string
|
||||
KafkaConfigFileName string
|
||||
RedisConfigFileName string
|
||||
ZookeeperConfigFileName string
|
||||
MongodbConfigFileName string
|
||||
MinioConfigFileName string
|
||||
LogConfigFileName string
|
||||
@ -42,6 +41,7 @@ var (
|
||||
OpenIMRPCMsgCfgFileName string
|
||||
OpenIMRPCThirdCfgFileName string
|
||||
OpenIMRPCUserCfgFileName string
|
||||
DiscoveryConfigFilename string
|
||||
)
|
||||
|
||||
var ConfigEnvPrefixMap map[string]string
|
||||
@ -54,7 +54,6 @@ func init() {
|
||||
LocalCacheConfigFileName = "local-cache.yml"
|
||||
KafkaConfigFileName = "kafka.yml"
|
||||
RedisConfigFileName = "redis.yml"
|
||||
ZookeeperConfigFileName = "zookeeper.yml"
|
||||
MongodbConfigFileName = "mongodb.yml"
|
||||
MinioConfigFileName = "minio.yml"
|
||||
LogConfigFileName = "log.yml"
|
||||
@ -70,16 +69,17 @@ func init() {
|
||||
OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml"
|
||||
OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml"
|
||||
OpenIMRPCUserCfgFileName = "openim-rpc-user.yml"
|
||||
DiscoveryConfigFilename = "discovery.yml"
|
||||
|
||||
ConfigEnvPrefixMap = make(map[string]string)
|
||||
fileNames := []string{
|
||||
FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName,
|
||||
KafkaConfigFileName, RedisConfigFileName, ZookeeperConfigFileName,
|
||||
KafkaConfigFileName, RedisConfigFileName,
|
||||
MongodbConfigFileName, MinioConfigFileName, LogConfigFileName,
|
||||
OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName,
|
||||
OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName,
|
||||
OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName,
|
||||
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName,
|
||||
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename,
|
||||
}
|
||||
|
||||
for _, fileName := range fileNames {
|
||||
|
||||
@ -36,11 +36,11 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig,
|
||||
RedisConfigFileName: &conversationConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &conversationConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &conversationConfig.MongodbConfig,
|
||||
ShareFileName: &conversationConfig.Share,
|
||||
NotificationFileName: &conversationConfig.NotificationConfig,
|
||||
LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &conversationConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -55,7 +55,7 @@ func (a *ConversationRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *ConversationRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
|
||||
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start)
|
||||
}
|
||||
|
||||
@ -34,8 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd {
|
||||
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
|
||||
ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig,
|
||||
ShareFileName: &cronTaskConfig.Share,
|
||||
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
|
||||
@ -36,12 +36,12 @@ func NewFriendRpcCmd() *FriendRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig,
|
||||
RedisConfigFileName: &friendConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &friendConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &friendConfig.MongodbConfig,
|
||||
ShareFileName: &friendConfig.Share,
|
||||
NotificationFileName: &friendConfig.NotificationConfig,
|
||||
WebhooksConfigFileName: &friendConfig.WebhooksConfig,
|
||||
LocalCacheConfigFileName: &friendConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &friendConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -56,7 +56,7 @@ func (a *FriendRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *FriendRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
|
||||
a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start)
|
||||
}
|
||||
|
||||
@ -36,12 +36,12 @@ func NewGroupRpcCmd() *GroupRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig,
|
||||
RedisConfigFileName: &groupConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &groupConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &groupConfig.MongodbConfig,
|
||||
ShareFileName: &groupConfig.Share,
|
||||
NotificationFileName: &groupConfig.NotificationConfig,
|
||||
WebhooksConfigFileName: &groupConfig.WebhooksConfig,
|
||||
LocalCacheConfigFileName: &groupConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &groupConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -56,7 +56,7 @@ func (a *GroupRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *GroupRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
|
||||
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start)
|
||||
}
|
||||
|
||||
@ -36,13 +36,13 @@ func NewMsgRpcCmd() *MsgRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig,
|
||||
RedisConfigFileName: &msgConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &msgConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &msgConfig.MongodbConfig,
|
||||
KafkaConfigFileName: &msgConfig.KafkaConfig,
|
||||
ShareFileName: &msgConfig.Share,
|
||||
NotificationFileName: &msgConfig.NotificationConfig,
|
||||
WebhooksConfigFileName: &msgConfig.WebhooksConfig,
|
||||
LocalCacheConfigFileName: &msgConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &msgConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -57,7 +57,7 @@ func (a *MsgRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *MsgRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
|
||||
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start)
|
||||
}
|
||||
|
||||
@ -36,9 +36,9 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
|
||||
ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig}
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
|
||||
ZookeeperConfigFileName: &msgGatewayConfig.ZookeeperConfig,
|
||||
ShareFileName: &msgGatewayConfig.Share,
|
||||
WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig,
|
||||
DiscoveryConfigFilename: &msgGatewayConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
|
||||
@ -37,9 +37,9 @@ func NewMsgTransferCmd() *MsgTransferCmd {
|
||||
RedisConfigFileName: &msgTransferConfig.RedisConfig,
|
||||
MongodbConfigFileName: &msgTransferConfig.MongodbConfig,
|
||||
KafkaConfigFileName: &msgTransferConfig.KafkaConfig,
|
||||
ZookeeperConfigFileName: &msgTransferConfig.ZookeeperConfig,
|
||||
ShareFileName: &msgTransferConfig.Share,
|
||||
WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig,
|
||||
DiscoveryConfigFilename: &msgTransferConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
|
||||
@ -36,13 +36,13 @@ func NewPushRpcCmd() *PushRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMPushCfgFileName: &pushConfig.RpcConfig,
|
||||
RedisConfigFileName: &pushConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &pushConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &pushConfig.MongodbConfig,
|
||||
KafkaConfigFileName: &pushConfig.KafkaConfig,
|
||||
ShareFileName: &pushConfig.Share,
|
||||
NotificationFileName: &pushConfig.NotificationConfig,
|
||||
WebhooksConfigFileName: &pushConfig.WebhooksConfig,
|
||||
LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &pushConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -57,7 +57,7 @@ func (a *PushRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *PushRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
|
||||
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start)
|
||||
}
|
||||
|
||||
@ -36,12 +36,12 @@ func NewThirdRpcCmd() *ThirdRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig,
|
||||
RedisConfigFileName: &thirdConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &thirdConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &thirdConfig.MongodbConfig,
|
||||
ShareFileName: &thirdConfig.Share,
|
||||
NotificationFileName: &thirdConfig.NotificationConfig,
|
||||
MinioConfigFileName: &thirdConfig.MinioConfig,
|
||||
LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &thirdConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -56,7 +56,7 @@ func (a *ThirdRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *ThirdRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
|
||||
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start)
|
||||
}
|
||||
|
||||
@ -36,13 +36,13 @@ func NewUserRpcCmd() *UserRpcCmd {
|
||||
ret.configMap = map[string]any{
|
||||
OpenIMRPCUserCfgFileName: &userConfig.RpcConfig,
|
||||
RedisConfigFileName: &userConfig.RedisConfig,
|
||||
ZookeeperConfigFileName: &userConfig.ZookeeperConfig,
|
||||
MongodbConfigFileName: &userConfig.MongodbConfig,
|
||||
KafkaConfigFileName: &userConfig.KafkaConfig,
|
||||
ShareFileName: &userConfig.Share,
|
||||
NotificationFileName: &userConfig.NotificationConfig,
|
||||
WebhooksConfigFileName: &userConfig.WebhooksConfig,
|
||||
LocalCacheConfigFileName: &userConfig.LocalCacheConfig,
|
||||
DiscoveryConfigFilename: &userConfig.Discovery,
|
||||
}
|
||||
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
|
||||
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
|
||||
@ -57,7 +57,7 @@ func (a *UserRpcCmd) Exec() error {
|
||||
}
|
||||
|
||||
func (a *UserRpcCmd) runE() error {
|
||||
return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
|
||||
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
|
||||
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports,
|
||||
a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start)
|
||||
}
|
||||
|
||||
@ -345,7 +345,6 @@ type AfterConfig struct {
|
||||
|
||||
type Share struct {
|
||||
Secret string `mapstructure:"secret"`
|
||||
Env string `mapstructure:"env"`
|
||||
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
|
||||
IMAdminUserID []string `mapstructure:"imAdminUserID"`
|
||||
}
|
||||
@ -432,6 +431,19 @@ type ZooKeeper struct {
|
||||
Password string `mapstructure:"password"`
|
||||
}
|
||||
|
||||
type Discovery struct {
|
||||
Enable string `mapstructure:"enable"`
|
||||
Etcd Etcd `mapstructure:"etcd"`
|
||||
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
|
||||
}
|
||||
|
||||
type Etcd struct {
|
||||
RootDirectory string `mapstructure:"rootDirectory"`
|
||||
Address []string `mapstructure:"address"`
|
||||
Username string `mapstructure:"username"`
|
||||
Password string `mapstructure:"password"`
|
||||
}
|
||||
|
||||
func (m *Mongo) Build() *mongoutil.Config {
|
||||
return &mongoutil.Config{
|
||||
Uri: m.URI,
|
||||
|
||||
@ -18,36 +18,34 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
"github.com/openimsdk/tools/discovery/etcd"
|
||||
"github.com/openimsdk/tools/discovery/zookeeper"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
zookeeperConst = "zookeeper"
|
||||
kubenetesConst = "k8s"
|
||||
directConst = "direct"
|
||||
)
|
||||
|
||||
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
||||
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
|
||||
switch share.Env {
|
||||
case zookeeperConst:
|
||||
|
||||
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
|
||||
switch discovery.Enable {
|
||||
case "zookeeper":
|
||||
return zookeeper.NewZkClient(
|
||||
zookeeperConfig.Address,
|
||||
zookeeperConfig.Schema,
|
||||
discovery.ZooKeeper.Address,
|
||||
discovery.ZooKeeper.Schema,
|
||||
zookeeper.WithFreq(time.Hour),
|
||||
zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password),
|
||||
zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password),
|
||||
zookeeper.WithRoundRobin(),
|
||||
zookeeper.WithTimeout(10),
|
||||
)
|
||||
case kubenetesConst:
|
||||
case "k8s":
|
||||
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
|
||||
case directConst:
|
||||
//return direct.NewConnDirect(config)
|
||||
case "etcd":
|
||||
return etcd.NewSvcDiscoveryRegistry(
|
||||
discovery.Etcd.RootDirectory,
|
||||
discovery.Etcd.Address,
|
||||
etcd.WithDialTimeout(10*time.Second),
|
||||
etcd.WithMaxCallSendMsgSize(20*1024*1024),
|
||||
etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))
|
||||
default:
|
||||
return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap()
|
||||
return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap()
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
15
pkg/common/discoveryregister/etcd/doc.go
Normal file
15
pkg/common/discoveryregister/etcd/doc.go
Normal file
@ -0,0 +1,15 @@
|
||||
// Copyright © 2024 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd"
|
||||
@ -1,44 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zookeeper
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||
func getEnv(key, fallback string) string {
|
||||
if value, exists := os.LookupEnv(key); exists {
|
||||
return value
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables.
|
||||
// If the environment variables are not set, it returns the fallback value.
|
||||
func getZkAddrFromEnv(fallback []string) []string {
|
||||
address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS")
|
||||
port, portExists := os.LookupEnv("ZOOKEEPER_PORT")
|
||||
|
||||
if addrExists && portExists {
|
||||
addresses := strings.Split(address, ",")
|
||||
for i, addr := range addresses {
|
||||
addresses[i] = addr + ":" + port
|
||||
}
|
||||
return addresses
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
@ -44,7 +44,7 @@ import (
|
||||
)
|
||||
|
||||
// Start rpc server.
|
||||
func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP,
|
||||
func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusConfig *config2.Prometheus, listenIP,
|
||||
registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context,
|
||||
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
|
||||
|
||||
@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome
|
||||
}
|
||||
|
||||
defer listener.Close()
|
||||
client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share)
|
||||
client, err := kdisc.NewDiscoveryRegister(discovery, share)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/db/redisutil"
|
||||
"github.com/openimsdk/tools/discovery/etcd"
|
||||
"github.com/openimsdk/tools/discovery/zookeeper"
|
||||
"github.com/openimsdk/tools/mq/kafka"
|
||||
"github.com/openimsdk/tools/s3/minio"
|
||||
@ -43,6 +44,14 @@ func CheckZookeeper(ctx context.Context, config *config.ZooKeeper) error {
|
||||
return zookeeper.Check(ctx, config.Address, config.Schema, zookeeper.WithUserNameAndPassword(config.Username, config.Password))
|
||||
}
|
||||
|
||||
func CheckEtcd(ctx context.Context, config *config.Etcd) error {
|
||||
return etcd.Check(ctx, config.Address, "/check_openim_component",
|
||||
true,
|
||||
etcd.WithDialTimeout(10*time.Second),
|
||||
etcd.WithMaxCallSendMsgSize(20*1024*1024),
|
||||
etcd.WithUsernameAndPassword(config.Username, config.Password))
|
||||
}
|
||||
|
||||
func CheckMongo(ctx context.Context, config *config.Mongo) error {
|
||||
return mongoutil.Check(ctx, config.Build())
|
||||
}
|
||||
@ -59,14 +68,14 @@ func CheckKafka(ctx context.Context, conf *config.Kafka) error {
|
||||
return kafka.Check(ctx, conf.Build(), []string{conf.ToMongoTopic, conf.ToRedisTopic, conf.ToPushTopic})
|
||||
}
|
||||
|
||||
func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.ZooKeeper, error) {
|
||||
func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.Discovery, error) {
|
||||
var (
|
||||
mongoConfig = &config.Mongo{}
|
||||
redisConfig = &config.Redis{}
|
||||
kafkaConfig = &config.Kafka{}
|
||||
minioConfig = &config.Minio{}
|
||||
zookeeperConfig = &config.ZooKeeper{}
|
||||
thirdConfig = &config.Third{}
|
||||
mongoConfig = &config.Mongo{}
|
||||
redisConfig = &config.Redis{}
|
||||
kafkaConfig = &config.Kafka{}
|
||||
minioConfig = &config.Minio{}
|
||||
discovery = &config.Discovery{}
|
||||
thirdConfig = &config.Third{}
|
||||
)
|
||||
err := config.LoadConfig(filepath.Join(configDir, cmd.MongodbConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], mongoConfig)
|
||||
if err != nil {
|
||||
@ -96,11 +105,11 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka,
|
||||
} else {
|
||||
minioConfig = nil
|
||||
}
|
||||
err = config.LoadConfig(filepath.Join(configDir, cmd.ZookeeperConfigFileName), cmd.ConfigEnvPrefixMap[cmd.ZookeeperConfigFileName], zookeeperConfig)
|
||||
err = config.LoadConfig(filepath.Join(configDir, cmd.DiscoveryConfigFilename), cmd.ConfigEnvPrefixMap[cmd.DiscoveryConfigFilename], discovery)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
return mongoConfig, redisConfig, kafkaConfig, minioConfig, zookeeperConfig, nil
|
||||
return mongoConfig, redisConfig, kafkaConfig, minioConfig, discovery, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
@ -127,35 +136,40 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, zookeeperConfig *config.ZooKeeper, maxRetry int) error {
|
||||
func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, discovery *config.Discovery, maxRetry int) error {
|
||||
checksDone := make(map[string]bool)
|
||||
|
||||
checks := map[string]func() error{
|
||||
"Zookeeper": func() error {
|
||||
return CheckZookeeper(ctx, zookeeperConfig)
|
||||
},
|
||||
"Mongo": func() error {
|
||||
checks := map[string]func(ctx context.Context) error{
|
||||
"Mongo": func(ctx context.Context) error {
|
||||
return CheckMongo(ctx, mongoConfig)
|
||||
},
|
||||
"Redis": func() error {
|
||||
"Redis": func(ctx context.Context) error {
|
||||
return CheckRedis(ctx, redisConfig)
|
||||
},
|
||||
"Kafka": func() error {
|
||||
"Kafka": func(ctx context.Context) error {
|
||||
return CheckKafka(ctx, kafkaConfig)
|
||||
},
|
||||
}
|
||||
|
||||
if minioConfig != nil {
|
||||
checks["MinIO"] = func() error {
|
||||
checks["MinIO"] = func(ctx context.Context) error {
|
||||
return CheckMinIO(ctx, minioConfig)
|
||||
}
|
||||
}
|
||||
if discovery.Enable == "etcd" {
|
||||
checks["Etcd"] = func(ctx context.Context) error {
|
||||
return CheckEtcd(ctx, &discovery.Etcd)
|
||||
}
|
||||
} else if discovery.Enable == "zookeeper" {
|
||||
checks["Zookeeper"] = func(ctx context.Context) error {
|
||||
return CheckZookeeper(ctx, &discovery.ZooKeeper)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < maxRetry; i++ {
|
||||
allSuccess := true
|
||||
for name, check := range checks {
|
||||
if !checksDone[name] {
|
||||
if err := check(); err != nil {
|
||||
if err := check(ctx); err != nil {
|
||||
fmt.Printf("%s check failed: %v\n", name, err)
|
||||
allSuccess = false
|
||||
} else {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user