mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-26 14:19:38 +08:00
Merge branch 'openimsdk:main' into main
This commit is contained in:
commit
6e3c99ba69
2
.env
2
.env
@ -2,7 +2,7 @@ MONGO_IMAGE=mongo:7.0
|
|||||||
REDIS_IMAGE=redis:7.0.0
|
REDIS_IMAGE=redis:7.0.0
|
||||||
KAFKA_IMAGE=bitnami/kafka:3.5.1
|
KAFKA_IMAGE=bitnami/kafka:3.5.1
|
||||||
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z
|
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z
|
||||||
ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13
|
ETCD_IMAGE=bitnami/etcd:3.5.13
|
||||||
PROMETHEUS_IMAGE=prom/prometheus:v2.45.6
|
PROMETHEUS_IMAGE=prom/prometheus:v2.45.6
|
||||||
ALERTMANAGER_IMAGE=prom/alertmanager:v0.27.0
|
ALERTMANAGER_IMAGE=prom/alertmanager:v0.27.0
|
||||||
GRAFANA_IMAGE=grafana/grafana:11.0.1
|
GRAFANA_IMAGE=grafana/grafana:11.0.1
|
||||||
|
16
.github/workflows/merge-from-milestone.yml
vendored
16
.github/workflows/merge-from-milestone.yml
vendored
@ -155,6 +155,17 @@ jobs:
|
|||||||
'{title: $title, head: $head, base: $base, body: $body}')")
|
'{title: $title, head: $head, base: $base, body: $body}')")
|
||||||
|
|
||||||
new_pr_number=$(echo "$response" | jq -r '.number')
|
new_pr_number=$(echo "$response" | jq -r '.number')
|
||||||
|
|
||||||
|
if [[ "$new_pr_number" == "null" || -z "$new_pr_number" ]]; then
|
||||||
|
echo "Failed to create PR. Response: $response"
|
||||||
|
|
||||||
|
git checkout $TARGET_BRANCH
|
||||||
|
|
||||||
|
git branch -D $cherry_pick_branch
|
||||||
|
|
||||||
|
echo "Deleted branch: $cherry_pick_branch"
|
||||||
|
git push origin --delete $cherry_pick_branch
|
||||||
|
else
|
||||||
echo "Created PR #$new_pr_number"
|
echo "Created PR #$new_pr_number"
|
||||||
|
|
||||||
curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \
|
curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \
|
||||||
@ -162,4 +173,9 @@ jobs:
|
|||||||
-d '{"labels": ["milestone-merge"]}' \
|
-d '{"labels": ["milestone-merge"]}' \
|
||||||
"https://api.github.com/repos/${{ github.repository }}/issues/$new_pr_number/labels"
|
"https://api.github.com/repos/${{ github.repository }}/issues/$new_pr_number/labels"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "----------------------------------------"
|
||||||
|
echo ""
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
17
cmd/main.go
17
cmd/main.go
@ -3,7 +3,6 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@ -39,7 +38,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/system/program"
|
"github.com/openimsdk/tools/system/program"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"github.com/openimsdk/tools/utils/network"
|
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
@ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ip, err := network.GetLocalIP()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("prometheus listen %d error %w", port, err)
|
return fmt.Errorf("prometheus listen %d error %w", port, err)
|
||||||
}
|
}
|
||||||
defer listener.Close()
|
defer listener.Close()
|
||||||
log.ZDebug(ctx, "prometheus start", "addr", listener.Addr())
|
log.ZDebug(ctx, "prometheus start", "addr", listener.Addr())
|
||||||
target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
go func() {
|
go func() {
|
||||||
err := prommetrics.Start(listener)
|
err := prommetrics.Start(listener)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
|
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) {
|
||||||
name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
|
name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
|
||||||
if index := strings.Index(name, "."); index >= 0 {
|
if index := strings.Index(name, "."); index >= 0 {
|
||||||
name = name[:index]
|
name = name[:index]
|
||||||
@ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C
|
|||||||
if err := cmd.parseConf(&conf); err != nil {
|
if err := cmd.parseConf(&conf); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar())
|
return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
enable: etcd
|
enable: etcd
|
||||||
etcd:
|
etcd:
|
||||||
rootDirectory: openim
|
rootDirectory: openim
|
||||||
address: [ localhost:12379 ]
|
address: [localhost:12379]
|
||||||
username: ''
|
## Attention: If you set auth in etcd
|
||||||
password: ''
|
## you must also update the username and password in Chat project.
|
||||||
|
username:
|
||||||
|
password:
|
||||||
|
|
||||||
kubernetes:
|
kubernetes:
|
||||||
namespace: default
|
namespace: default
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
# Username for authentication
|
## Kafka authentication
|
||||||
username: ''
|
username:
|
||||||
# Password for authentication
|
password:
|
||||||
password: ''
|
|
||||||
# Producer acknowledgment settings
|
# Producer acknowledgment settings
|
||||||
producerAck:
|
producerAck:
|
||||||
# Compression type to use (e.g., none, gzip, snappy)
|
# Compression type to use (e.g., none, gzip, snappy)
|
||||||
compressType: none
|
compressType: none
|
||||||
# List of Kafka broker addresses
|
# List of Kafka broker addresses
|
||||||
address: [ localhost:19094 ]
|
address: [localhost:19094]
|
||||||
# Kafka topic for Redis integration
|
# Kafka topic for Redis integration
|
||||||
toRedisTopic: toRedis
|
toRedisTopic: toRedis
|
||||||
# Kafka topic for MongoDB integration
|
# Kafka topic for MongoDB integration
|
||||||
|
@ -1,7 +1,14 @@
|
|||||||
address: [ localhost:16379 ]
|
address: [localhost:16379]
|
||||||
username:
|
username:
|
||||||
password: openIM123
|
password: openIM123
|
||||||
clusterMode: false
|
# redis Mode, including "standalone","cluster","sentinel"
|
||||||
|
redisMode: "standalone"
|
||||||
db: 0
|
db: 0
|
||||||
maxRetry: 10
|
maxRetry: 10
|
||||||
poolSize: 100
|
poolSize: 100
|
||||||
|
# Sentinel configuration (only used when redisMode is "sentinel")
|
||||||
|
sentinelMode:
|
||||||
|
masterName: "redis-master"
|
||||||
|
sentinelsAddrs: ["127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"]
|
||||||
|
routeByLatency: true
|
||||||
|
routeRandomly: true
|
||||||
|
@ -16,7 +16,7 @@ afterUpdateUserInfoEx:
|
|||||||
afterSendSingleMsg:
|
afterSendSingleMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
# Only the recvID specified in attentionIds will send the callback
|
# Only the recvIDs specified in attentionIds will send the callback
|
||||||
# if not set, all user messages will be callback
|
# if not set, all user messages will be callback
|
||||||
attentionIds: []
|
attentionIds: []
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
@ -36,7 +36,7 @@ beforeMsgModify:
|
|||||||
afterSendGroupMsg:
|
afterSendGroupMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
# Only the recvID specified in attentionIds will send the callback
|
# Only the GroupIDs specified in attentionIds will send the callback
|
||||||
# if not set, all user messages will be callback
|
# if not set, all user messages will be callback
|
||||||
attentionIds: []
|
attentionIds: []
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
|
@ -83,8 +83,83 @@ services:
|
|||||||
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
|
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
|
||||||
- ETCD_INITIAL_CLUSTER_TOKEN=tkn
|
- ETCD_INITIAL_CLUSTER_TOKEN=tkn
|
||||||
- ETCD_INITIAL_CLUSTER_STATE=new
|
- ETCD_INITIAL_CLUSTER_STATE=new
|
||||||
|
- ALLOW_NONE_AUTHENTICATION=no
|
||||||
|
|
||||||
|
## Optional: Enable etcd authentication by setting the following credentials
|
||||||
|
# - ETCD_ROOT_USER=root
|
||||||
|
# - ETCD_ROOT_PASSWORD=openIM123
|
||||||
|
# - ETCD_USERNAME=openIM
|
||||||
|
# - ETCD_PASSWORD=openIM123
|
||||||
volumes:
|
volumes:
|
||||||
- "${DATA_DIR}/components/etcd:/etcd-data"
|
- "${DATA_DIR}/components/etcd:/etcd-data"
|
||||||
|
command: >
|
||||||
|
/bin/sh -c '
|
||||||
|
etcd &
|
||||||
|
export ETCDCTL_API=3
|
||||||
|
echo "Waiting for etcd to become healthy..."
|
||||||
|
until etcdctl --endpoints=http://127.0.0.1:2379 endpoint health &>/dev/null; do
|
||||||
|
echo "Waiting for ETCD to start..."
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "etcd is healthy."
|
||||||
|
|
||||||
|
if [ -n "$${ETCD_ROOT_USER}" ] && [ -n "$${ETCD_ROOT_PASSWORD}" ] && [ -n "$${ETCD_USERNAME}" ] && [ -n "$${ETCD_PASSWORD}" ]; then
|
||||||
|
echo "Authentication credentials provided. Setting up authentication..."
|
||||||
|
|
||||||
|
echo "Checking authentication status..."
|
||||||
|
if ! etcdctl --endpoints=http://127.0.0.1:2379 auth status | grep -q "Authentication Status: true"; then
|
||||||
|
echo "Authentication is disabled. Creating users and enabling..."
|
||||||
|
|
||||||
|
# Create users and setup permissions
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 user add $${ETCD_ROOT_USER} --new-user-password=$${ETCD_ROOT_PASSWORD} || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 user add $${ETCD_USERNAME} --new-user-password=$${ETCD_PASSWORD} || true
|
||||||
|
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 role add openim-role || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 role grant-permission openim-role --prefix=true readwrite / || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 role grant-permission openim-role --prefix=true readwrite "" || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_USERNAME} openim-role || true
|
||||||
|
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_ROOT_USER} $${ETCD_USERNAME} root || true
|
||||||
|
|
||||||
|
echo "Enabling authentication..."
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 auth enable
|
||||||
|
echo "Authentication enabled successfully"
|
||||||
|
else
|
||||||
|
echo "Authentication is already enabled. Checking OpenIM user..."
|
||||||
|
|
||||||
|
# Check if openIM user exists and can perform operations
|
||||||
|
if ! etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} put /test/auth "auth-check" &>/dev/null; then
|
||||||
|
echo "OpenIM user test failed. Recreating user with root credentials..."
|
||||||
|
|
||||||
|
# Try to create/update the openIM user using root credentials
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} user add $${ETCD_USERNAME} --new-user-password=$${ETCD_PASSWORD} --no-password-file || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role add openim-role || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role grant-permission openim-role --prefix=true readwrite / || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role grant-permission openim-role --prefix=true readwrite "" || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} user grant-role $${ETCD_USERNAME} openim-role || true
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_ROOT_USER} $${ETCD_USERNAME} root || true
|
||||||
|
|
||||||
|
echo "OpenIM user recreated with required permissions"
|
||||||
|
else
|
||||||
|
echo "OpenIM user exists and has correct permissions"
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} del /test/auth &>/dev/null
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
echo "Testing authentication with OpenIM user..."
|
||||||
|
if etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} put /test/auth "auth-works"; then
|
||||||
|
echo "Authentication working properly"
|
||||||
|
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} del /test/auth
|
||||||
|
else
|
||||||
|
echo "WARNING: Authentication test failed"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
echo "No authentication credentials provided. Running in no-auth mode."
|
||||||
|
echo "To enable authentication, set ETCD_ROOT_USER, ETCD_ROOT_PASSWORD, ETCD_USERNAME, and ETCD_PASSWORD environment variables."
|
||||||
|
fi
|
||||||
|
|
||||||
|
tail -f /dev/null
|
||||||
|
'
|
||||||
restart: always
|
restart: always
|
||||||
networks:
|
networks:
|
||||||
- openim
|
- openim
|
||||||
@ -104,12 +179,38 @@ services:
|
|||||||
KAFKA_CFG_NODE_ID: 0
|
KAFKA_CFG_NODE_ID: 0
|
||||||
KAFKA_CFG_PROCESS_ROLES: controller,broker
|
KAFKA_CFG_PROCESS_ROLES: controller,broker
|
||||||
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
||||||
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
|
|
||||||
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
|
|
||||||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
|
||||||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||||
KAFKA_NUM_PARTITIONS: 8
|
KAFKA_NUM_PARTITIONS: 8
|
||||||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
||||||
|
|
||||||
|
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
|
||||||
|
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
|
||||||
|
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
|
||||||
|
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
|
||||||
|
|
||||||
|
# Authentication configuration variables - comment out to disable auth
|
||||||
|
# KAFKA_USERNAME: "openIM"
|
||||||
|
# KAFKA_PASSWORD: "openIM123"
|
||||||
|
command: >
|
||||||
|
/bin/sh -c '
|
||||||
|
if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then
|
||||||
|
echo "=== Kafka SASL Authentication ENABLED ==="
|
||||||
|
echo "Username: $${KAFKA_USERNAME}"
|
||||||
|
|
||||||
|
# Set environment variables for SASL authentication
|
||||||
|
export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
|
||||||
|
export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
|
||||||
|
export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
|
||||||
|
export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
|
||||||
|
export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
|
||||||
|
export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT"
|
||||||
|
export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}"
|
||||||
|
export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Start Kafka with the configured environment
|
||||||
|
exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
|
||||||
|
'
|
||||||
networks:
|
networks:
|
||||||
- openim
|
- openim
|
||||||
|
|
||||||
@ -161,9 +262,9 @@ services:
|
|||||||
- ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml
|
- ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml
|
||||||
- ${DATA_DIR}/components/prometheus/data:/prometheus
|
- ${DATA_DIR}/components/prometheus/data:/prometheus
|
||||||
command:
|
command:
|
||||||
- '--config.file=/etc/prometheus/prometheus.yml'
|
- "--config.file=/etc/prometheus/prometheus.yml"
|
||||||
- '--storage.tsdb.path=/prometheus'
|
- "--storage.tsdb.path=/prometheus"
|
||||||
- '--web.listen-address=:${PROMETHEUS_PORT}'
|
- "--web.listen-address=:${PROMETHEUS_PORT}"
|
||||||
network_mode: host
|
network_mode: host
|
||||||
|
|
||||||
alertmanager:
|
alertmanager:
|
||||||
@ -176,8 +277,8 @@ services:
|
|||||||
- ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml
|
- ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml
|
||||||
- ./config/email.tmpl:/etc/alertmanager/email.tmpl
|
- ./config/email.tmpl:/etc/alertmanager/email.tmpl
|
||||||
command:
|
command:
|
||||||
- '--config.file=/etc/alertmanager/alertmanager.yml'
|
- "--config.file=/etc/alertmanager/alertmanager.yml"
|
||||||
- '--web.listen-address=:${ALERTMANAGER_PORT}'
|
- "--web.listen-address=:${ALERTMANAGER_PORT}"
|
||||||
network_mode: host
|
network_mode: host
|
||||||
|
|
||||||
grafana:
|
grafana:
|
||||||
@ -209,9 +310,8 @@ services:
|
|||||||
- /sys:/host/sys:ro
|
- /sys:/host/sys:ro
|
||||||
- /:/rootfs:ro
|
- /:/rootfs:ro
|
||||||
command:
|
command:
|
||||||
- '--path.procfs=/host/proc'
|
- "--path.procfs=/host/proc"
|
||||||
- '--path.sysfs=/host/sys'
|
- "--path.sysfs=/host/sys"
|
||||||
- '--path.rootfs=/rootfs'
|
- "--path.rootfs=/rootfs"
|
||||||
- '--web.listen-address=:19100'
|
- "--web.listen-address=:19100"
|
||||||
network_mode: host
|
network_mode: host
|
||||||
|
|
||||||
|
2
go.mod
2
go.mod
@ -13,7 +13,7 @@ require (
|
|||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.73-alpha.12
|
github.com/openimsdk/protocol v0.0.73-alpha.12
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.84
|
github.com/openimsdk/tools v0.0.50-alpha.91
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
|
4
go.sum
4
go.sum
@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b
|
|||||||
github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
|
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
|
||||||
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8=
|
github.com/openimsdk/tools v0.0.50-alpha.91 h1:4zXtTwwCIUawet1VDvnD3C/1E4N4ostDfh+RfL5nz90=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
|
github.com/openimsdk/tools v0.0.50-alpha.91/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
@ -39,7 +39,7 @@ type Config struct {
|
|||||||
Index conf.Index
|
Index conf.Index
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
|
||||||
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
|
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -6,35 +6,29 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/tools/apiresp"
|
"github.com/openimsdk/tools/apiresp"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
"github.com/openimsdk/tools/discovery/etcd"
|
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PrometheusDiscoveryApi struct {
|
type PrometheusDiscoveryApi struct {
|
||||||
config *Config
|
config *Config
|
||||||
client *clientv3.Client
|
|
||||||
kv discovery.KeyValue
|
kv discovery.KeyValue
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi {
|
func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi {
|
||||||
api := &PrometheusDiscoveryApi{
|
api := &PrometheusDiscoveryApi{
|
||||||
config: config,
|
config: config,
|
||||||
}
|
kv: client,
|
||||||
if config.Discovery.Enable == conf.ETCD {
|
|
||||||
api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
|
|
||||||
}
|
}
|
||||||
return api
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
||||||
value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key))
|
value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, discovery.ErrNotSupportedKeyValue) {
|
if errors.Is(err, discovery.ErrNotSupported) {
|
||||||
c.JSON(http.StatusOK, []struct{}{})
|
c.JSON(http.StatusOK, []struct{}{})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
var resp prommetrics.RespTarget
|
var resp prommetrics.RespTarget
|
||||||
if err := json.Unmarshal(value, &resp); err != nil {
|
for i := range value {
|
||||||
|
var tmp prommetrics.Target
|
||||||
|
if err = json.Unmarshal(value[i], &tmp); err != nil {
|
||||||
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
|
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resp.Targets = append(resp.Targets, tmp.Target)
|
||||||
|
resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget
|
||||||
|
}
|
||||||
|
|
||||||
c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp})
|
c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) {
|
func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) {
|
||||||
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
|
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -39,7 +39,7 @@ type Config struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start run ws server.
|
// Start run ws server.
|
||||||
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
|
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
|
||||||
"rpcPorts", conf.MsgGateway.RPC.Ports,
|
"rpcPorts", conf.MsgGateway.RPC.Ports,
|
||||||
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
|
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
|
||||||
|
132
internal/msgtransfer/callback.go
Normal file
132
internal/msgtransfer/callback.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
package msgtransfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
|
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
|
)
|
||||||
|
|
||||||
|
func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq {
|
||||||
|
return cbapi.CommonCallbackReq{
|
||||||
|
SendID: msg.SendID,
|
||||||
|
ServerMsgID: msg.ServerMsgID,
|
||||||
|
CallbackCommand: command,
|
||||||
|
ClientMsgID: msg.ClientMsgID,
|
||||||
|
OperationID: mcontext.GetOperationID(ctx),
|
||||||
|
SenderPlatformID: msg.SenderPlatformID,
|
||||||
|
SenderNickname: msg.SenderNickname,
|
||||||
|
SessionType: msg.SessionType,
|
||||||
|
MsgFrom: msg.MsgFrom,
|
||||||
|
ContentType: msg.ContentType,
|
||||||
|
Status: msg.Status,
|
||||||
|
SendTime: msg.SendTime,
|
||||||
|
CreateTime: msg.CreateTime,
|
||||||
|
AtUserIDList: msg.AtUserIDList,
|
||||||
|
SenderFaceURL: msg.SenderFaceURL,
|
||||||
|
Content: GetContent(msg),
|
||||||
|
Seq: uint32(msg.Seq),
|
||||||
|
Ex: msg.Ex,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetContent(msg *sdkws.MsgData) string {
|
||||||
|
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
|
||||||
|
var tips sdkws.TipsComm
|
||||||
|
_ = proto.Unmarshal(msg.Content, &tips)
|
||||||
|
content := tips.JsonDetail
|
||||||
|
return content
|
||||||
|
} else {
|
||||||
|
return string(msg.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||||
|
if msg.ContentType == constant.Typing {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !filterAfterMsg(msg, after) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||||
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||||
|
RecvID: msg.RecvID,
|
||||||
|
}
|
||||||
|
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||||
|
if msg.ContentType == constant.Typing {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !filterAfterMsg(msg, after) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
||||||
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
||||||
|
GroupID: msg.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||||
|
keyMsgData := apistruct.KeyMsgData{
|
||||||
|
SendID: msg.SendID,
|
||||||
|
RecvID: msg.RecvID,
|
||||||
|
GroupID: msg.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return map[string]string{
|
||||||
|
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool {
|
||||||
|
return filterMsg(msg, after.AttentionIds, after.DeniedTypes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool {
|
||||||
|
// According to the attentionIds configuration, only some users are sent
|
||||||
|
if len(attentionIds) != 0 && msg.ContentType == constant.SingleChatType && !datautil.Contain(msg.RecvID, attentionIds...) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(attentionIds) != 0 && msg.ContentType == constant.ReadGroupChatType && !datautil.Contain(msg.GroupID, attentionIds...) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if defaultDeniedTypes(msg.ContentType) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultDeniedTypes(contentType int32) bool {
|
||||||
|
if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if contentType == constant.Typing {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
@ -58,7 +58,7 @@ type Config struct {
|
|||||||
Index conf.Index
|
Index conf.Index
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
||||||
|
|
||||||
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
|
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
|
||||||
@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase)
|
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config)
|
||||||
|
|
||||||
msgTransfer := &MsgTransfer{
|
msgTransfer := &MsgTransfer{
|
||||||
historyConsumer: historyConsumer,
|
historyConsumer: historyConsumer,
|
||||||
|
@ -19,6 +19,8 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
pbmsg "github.com/openimsdk/protocol/msg"
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
@ -26,11 +28,15 @@ import (
|
|||||||
|
|
||||||
type OnlineHistoryMongoConsumerHandler struct {
|
type OnlineHistoryMongoConsumerHandler struct {
|
||||||
msgTransferDatabase controller.MsgTransferDatabase
|
msgTransferDatabase controller.MsgTransferDatabase
|
||||||
|
config *Config
|
||||||
|
webhookClient *webhook.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler {
|
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler {
|
||||||
return &OnlineHistoryMongoConsumerHandler{
|
return &OnlineHistoryMongoConsumerHandler{
|
||||||
msgTransferDatabase: database,
|
msgTransferDatabase: database,
|
||||||
|
config: config,
|
||||||
|
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
|
|||||||
} else {
|
} else {
|
||||||
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, msgData := range msgFromMQ.MsgData {
|
||||||
|
switch msgData.SessionType {
|
||||||
|
case constant.SingleChatType:
|
||||||
|
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
|
||||||
|
case constant.ReadGroupChatType:
|
||||||
|
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//var seqs []int64
|
//var seqs []int64
|
||||||
//for _, msg := range msgFromMQ.MsgData {
|
//for _, msg := range msgFromMQ.MsgData {
|
||||||
// seqs = append(seqs, msg.Seq)
|
// seqs = append(seqs, msg.Seq)
|
||||||
|
@ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
|
|||||||
return &pbpush.DelUserPushTokenResp{}, nil
|
return &pbpush.DelUserPushTokenResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
||||||
rdb, err := dbb.Redis(ctx)
|
rdb, err := dbb.Redis(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -59,7 +59,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
||||||
rdb, err := dbb.Redis(ctx)
|
rdb, err := dbb.Redis(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -69,7 +69,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||||
mgocli, err := dbb.Mongo(ctx)
|
mgocli, err := dbb.Mongo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -76,7 +76,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||||
mgocli, err := dbb.Mongo(ctx)
|
mgocli, err := dbb.Mongo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -16,13 +16,10 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
|
||||||
|
|
||||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -89,19 +86,20 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
// Move to msgtransfer
|
||||||
if msg.MsgData.ContentType == constant.Typing {
|
// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
||||||
return
|
// if msg.MsgData.ContentType == constant.Typing {
|
||||||
}
|
// return
|
||||||
if !filterAfterMsg(msg, after) {
|
// }
|
||||||
return
|
// if !filterAfterMsg(msg, after) {
|
||||||
}
|
// return
|
||||||
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
// }
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||||
RecvID: msg.MsgData.RecvID,
|
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||||
}
|
// RecvID: msg.MsgData.RecvID,
|
||||||
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
// }
|
||||||
}
|
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
||||||
|
// }
|
||||||
|
|
||||||
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
||||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
@ -123,20 +121,21 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
// Move to msgtransfer
|
||||||
if msg.MsgData.ContentType == constant.Typing {
|
// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
||||||
return
|
// if msg.MsgData.ContentType == constant.Typing {
|
||||||
}
|
// return
|
||||||
if !filterAfterMsg(msg, after) {
|
// }
|
||||||
return
|
// if !filterAfterMsg(msg, after) {
|
||||||
}
|
// return
|
||||||
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
// }
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
||||||
GroupID: msg.MsgData.GroupID,
|
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
||||||
}
|
// GroupID: msg.MsgData.GroupID,
|
||||||
|
// }
|
||||||
|
|
||||||
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
|
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
|
||||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
@ -205,14 +204,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft
|
|||||||
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
|
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||||
keyMsgData := apistruct.KeyMsgData{
|
// keyMsgData := apistruct.KeyMsgData{
|
||||||
SendID: msg.SendID,
|
// SendID: msg.SendID,
|
||||||
RecvID: msg.RecvID,
|
// RecvID: msg.RecvID,
|
||||||
GroupID: msg.GroupID,
|
// GroupID: msg.GroupID,
|
||||||
}
|
// }
|
||||||
|
|
||||||
return map[string]string{
|
// return map[string]string{
|
||||||
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
|
|||||||
go m.setConversationAtInfo(ctx, req.MsgData)
|
go m.setConversationAtInfo(ctx, req.MsgData)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
||||||
|
|
||||||
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
||||||
resp = &pbmsg.SendMsgResp{}
|
resp = &pbmsg.SendMsgResp{}
|
||||||
resp.SendTime = req.MsgData.SendTime
|
resp.SendTime = req.MsgData.SendTime
|
||||||
@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
|||||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
|
||||||
|
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
||||||
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
||||||
return &pbmsg.SendMsgResp{
|
return &pbmsg.SendMsgResp{
|
||||||
ServerMsgID: req.MsgData.ServerMsgID,
|
ServerMsgID: req.MsgData.ServerMsgID,
|
||||||
|
@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
||||||
redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic)
|
redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -66,7 +66,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||||
mgocli, err := dbb.Mongo(ctx)
|
mgocli, err := dbb.Mongo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -192,7 +192,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
|
|||||||
FromUserID: req.OwnerUserID,
|
FromUserID: req.OwnerUserID,
|
||||||
ToUserID: userID,
|
ToUserID: userID,
|
||||||
HandleResult: constant.FriendResponseAgree,
|
HandleResult: constant.FriendResponseAgree,
|
||||||
})
|
}, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
|
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
|
||||||
@ -221,7 +221,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
|
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
|
||||||
s.notificationSender.FriendApplicationAgreedNotification(ctx, req)
|
s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
if req.HandleResult == constant.FriendResponseRefuse {
|
if req.HandleResult == constant.FriendResponseRefuse {
|
||||||
|
@ -171,12 +171,18 @@ func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.
|
|||||||
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
|
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) {
|
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) {
|
||||||
request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
var (
|
||||||
|
request *sdkws.FriendRequest
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if checkReq {
|
||||||
|
request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
|
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
tips := sdkws.FriendApplicationApprovedTips{
|
tips := sdkws.FriendApplicationApprovedTips{
|
||||||
FromToUserID: &sdkws.FromToUserID{
|
FromToUserID: &sdkws.FromToUserID{
|
||||||
FromUserID: req.FromUserID,
|
FromUserID: req.FromUserID,
|
||||||
|
@ -64,7 +64,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||||
mgocli, err := dbb.Mongo(ctx)
|
mgocli, err := dbb.Mongo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -79,7 +79,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||||
mgocli, err := dbb.Mongo(ctx)
|
mgocli, err := dbb.Mongo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -25,7 +25,7 @@ type Config struct {
|
|||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
|
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
|
||||||
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
|
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
|
||||||
if conf.CronTask.RetainChatRecords < 1 {
|
if conf.CronTask.RetainChatRecords < 1 {
|
||||||
log.ZInfo(ctx, "disable cron")
|
log.ZInfo(ctx, "disable cron")
|
||||||
@ -58,6 +58,11 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
|||||||
cm.Watch(ctx)
|
cm.Watch(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
locker, err := NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
srv := &cronServer{
|
srv := &cronServer{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
config: conf,
|
config: conf,
|
||||||
@ -65,6 +70,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
|||||||
msgClient: msg.NewMsgClient(msgConn),
|
msgClient: msg.NewMsgClient(msgConn),
|
||||||
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
||||||
thirdClient: third.NewThirdClient(thirdConn),
|
thirdClient: third.NewThirdClient(thirdConn),
|
||||||
|
locker: locker,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := srv.registerClearS3(); err != nil {
|
if err := srv.registerClearS3(); err != nil {
|
||||||
@ -81,6 +87,8 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
|||||||
log.ZDebug(ctx, "cron task server is running")
|
log.ZDebug(ctx, "cron task server is running")
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
log.ZDebug(ctx, "cron task server is shutting down")
|
log.ZDebug(ctx, "cron task server is shutting down")
|
||||||
|
srv.cron.Stop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,6 +99,7 @@ type cronServer struct {
|
|||||||
msgClient msg.MsgClient
|
msgClient msg.MsgClient
|
||||||
conversationClient pbconversation.ConversationClient
|
conversationClient pbconversation.ConversationClient
|
||||||
thirdClient third.ThirdClient
|
thirdClient third.ThirdClient
|
||||||
|
locker *EtcdLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cronServer) registerClearS3() error {
|
func (c *cronServer) registerClearS3() error {
|
||||||
@ -98,7 +107,9 @@ func (c *cronServer) registerClearS3() error {
|
|||||||
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
|
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3)
|
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||||
|
c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3)
|
||||||
|
})
|
||||||
return errs.WrapMsg(err, "failed to register clear s3 cron task")
|
return errs.WrapMsg(err, "failed to register clear s3 cron task")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,11 +118,15 @@ func (c *cronServer) registerDeleteMsg() error {
|
|||||||
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
|
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg)
|
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||||
|
c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg)
|
||||||
|
})
|
||||||
return errs.WrapMsg(err, "failed to register delete msg cron task")
|
return errs.WrapMsg(err, "failed to register delete msg cron task")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cronServer) registerClearUserMsg() error {
|
func (c *cronServer) registerClearUserMsg() error {
|
||||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
|
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||||
|
c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg)
|
||||||
|
})
|
||||||
return errs.WrapMsg(err, "failed to register clear user msg cron task")
|
return errs.WrapMsg(err, "failed to register clear user msg cron task")
|
||||||
}
|
}
|
||||||
|
89
internal/tools/cron/dist_look.go
Normal file
89
internal/tools/cron/dist_look.go
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
package cron
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"go.etcd.io/etcd/client/v3/concurrency"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
lockLeaseTTL = 300
|
||||||
|
)
|
||||||
|
|
||||||
|
type EtcdLocker struct {
|
||||||
|
client *clientv3.Client
|
||||||
|
instanceID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEtcdLocker creates a new etcd distributed lock
|
||||||
|
func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) {
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
pid := os.Getpid()
|
||||||
|
instanceID := fmt.Sprintf("%s-pid-%d-%d", hostname, pid, time.Now().UnixNano())
|
||||||
|
|
||||||
|
locker := &EtcdLocker{
|
||||||
|
client: client,
|
||||||
|
instanceID: instanceID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return locker, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) {
|
||||||
|
session, err := concurrency.NewSession(e.client, concurrency.WithTTL(lockLeaseTTL))
|
||||||
|
if err != nil {
|
||||||
|
log.ZWarn(ctx, "Failed to create etcd session", err,
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
lockKey := fmt.Sprintf("openim/crontask/%s", taskName)
|
||||||
|
mutex := concurrency.NewMutex(session, lockKey)
|
||||||
|
|
||||||
|
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
err = mutex.TryLock(ctxWithTimeout)
|
||||||
|
if err != nil {
|
||||||
|
if err == context.DeadlineExceeded {
|
||||||
|
log.ZDebug(ctx, "Task is being executed by another instance, skipping",
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID)
|
||||||
|
} else {
|
||||||
|
log.ZWarn(ctx, "Failed to acquire task lock", err,
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err := mutex.Unlock(ctx); err != nil {
|
||||||
|
log.ZWarn(ctx, "Failed to release task lock", err,
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID)
|
||||||
|
} else {
|
||||||
|
log.ZInfo(ctx, "Successfully released task lock",
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.ZInfo(ctx, "Successfully acquired task lock, starting execution",
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID,
|
||||||
|
"sessionID", session.Lease())
|
||||||
|
|
||||||
|
task()
|
||||||
|
|
||||||
|
log.ZInfo(ctx, "Task execution completed",
|
||||||
|
"taskName", taskName,
|
||||||
|
"instanceID", e.instanceID)
|
||||||
|
}
|
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/internal/api"
|
"github.com/openimsdk/open-im-server/v3/internal/api"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
||||||
"github.com/openimsdk/open-im-server/v3/version"
|
"github.com/openimsdk/open-im-server/v3/version"
|
||||||
"github.com/openimsdk/tools/system/program"
|
"github.com/openimsdk/tools/system/program"
|
||||||
@ -84,7 +85,7 @@ func (a *ApiCmd) runE() error {
|
|||||||
a.apiConfig.API.Api.ListenIP, "",
|
a.apiConfig.API.Api.ListenIP, "",
|
||||||
a.apiConfig.API.Prometheus.AutoSetPorts,
|
a.apiConfig.API.Prometheus.AutoSetPorts,
|
||||||
nil, int(a.apiConfig.Index),
|
nil, int(a.apiConfig.Index),
|
||||||
a.apiConfig.Discovery.RpcService.MessageGateway,
|
prommetrics.APIKeyName,
|
||||||
&a.apiConfig.Notification,
|
&a.apiConfig.Notification,
|
||||||
a.apiConfig,
|
a.apiConfig,
|
||||||
[]string{},
|
[]string{},
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
|
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
||||||
"github.com/openimsdk/open-im-server/v3/version"
|
"github.com/openimsdk/open-im-server/v3/version"
|
||||||
"github.com/openimsdk/tools/system/program"
|
"github.com/openimsdk/tools/system/program"
|
||||||
@ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error {
|
|||||||
"", "",
|
"", "",
|
||||||
true,
|
true,
|
||||||
nil, int(m.msgTransferConfig.Index),
|
nil, int(m.msgTransferConfig.Index),
|
||||||
"",
|
prommetrics.MessageTransferKeyName,
|
||||||
nil,
|
nil,
|
||||||
m.msgTransferConfig,
|
m.msgTransferConfig,
|
||||||
[]string{},
|
[]string{},
|
||||||
|
@ -327,10 +327,18 @@ type Redis struct {
|
|||||||
Address []string `yaml:"address"`
|
Address []string `yaml:"address"`
|
||||||
Username string `yaml:"username"`
|
Username string `yaml:"username"`
|
||||||
Password string `yaml:"password"`
|
Password string `yaml:"password"`
|
||||||
ClusterMode bool `yaml:"clusterMode"`
|
RedisMode string `yaml:"redisMode"`
|
||||||
DB int `yaml:"storage"`
|
DB int `yaml:"db"`
|
||||||
MaxRetry int `yaml:"maxRetry"`
|
MaxRetry int `yaml:"maxRetry"`
|
||||||
PoolSize int `yaml:"poolSize"`
|
PoolSize int `yaml:"poolSize"`
|
||||||
|
SentinelMode Sentinel `yaml:"sentinelMode"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Sentinel struct {
|
||||||
|
MasterName string `yaml:"masterName"`
|
||||||
|
SentinelAddrs []string `yaml:"sentinelsAddrs"`
|
||||||
|
RouteByLatency bool `yaml:"routeByLatency"`
|
||||||
|
RouteRandomly bool `yaml:"routeRandomly"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type BeforeConfig struct {
|
type BeforeConfig struct {
|
||||||
@ -487,13 +495,19 @@ func (m *Mongo) Build() *mongoutil.Config {
|
|||||||
|
|
||||||
func (r *Redis) Build() *redisutil.Config {
|
func (r *Redis) Build() *redisutil.Config {
|
||||||
return &redisutil.Config{
|
return &redisutil.Config{
|
||||||
ClusterMode: r.ClusterMode,
|
RedisMode: r.RedisMode,
|
||||||
Address: r.Address,
|
Address: r.Address,
|
||||||
Username: r.Username,
|
Username: r.Username,
|
||||||
Password: r.Password,
|
Password: r.Password,
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
MaxRetry: r.MaxRetry,
|
MaxRetry: r.MaxRetry,
|
||||||
PoolSize: r.PoolSize,
|
PoolSize: r.PoolSize,
|
||||||
|
Sentinel: &redisutil.Sentinel{
|
||||||
|
MasterName: r.SentinelMode.MasterName,
|
||||||
|
SentinelAddrs: r.SentinelMode.SentinelAddrs,
|
||||||
|
RouteByLatency: r.SentinelMode.RouteByLatency,
|
||||||
|
RouteRandomly: r.SentinelMode.RouteRandomly,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (dis
|
|||||||
return standalone.GetSvcDiscoveryRegistry(), nil
|
return standalone.GetSvcDiscoveryRegistry(), nil
|
||||||
}
|
}
|
||||||
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
|
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
|
||||||
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
|
return kubernetes.NewConnManager(discovery.Kubernetes.Namespace, nil,
|
||||||
grpc.WithDefaultCallOptions(
|
grpc.WithDefaultCallOptions(
|
||||||
grpc.MaxCallSendMsgSize(1024*1024*20),
|
grpc.MaxCallSendMsgSize(1024*1024*20),
|
||||||
),
|
),
|
||||||
|
@ -85,6 +85,8 @@ func Start(listener net.Listener) error {
|
|||||||
const (
|
const (
|
||||||
APIKeyName = "api"
|
APIKeyName = "api"
|
||||||
MessageTransferKeyName = "message-transfer"
|
MessageTransferKeyName = "message-transfer"
|
||||||
|
|
||||||
|
TTL = 300
|
||||||
)
|
)
|
||||||
|
|
||||||
type Target struct {
|
type Target struct {
|
||||||
@ -97,10 +99,14 @@ type RespTarget struct {
|
|||||||
Labels map[string]string `json:"labels"`
|
Labels map[string]string `json:"labels"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func BuildDiscoveryKey(name string) string {
|
func BuildDiscoveryKeyPrefix(name string) string {
|
||||||
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
|
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BuildDiscoveryKey(name string, index int) string {
|
||||||
|
return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index)
|
||||||
|
}
|
||||||
|
|
||||||
func BuildDefaultTarget(host string, ip int) Target {
|
func BuildDefaultTarget(host string, ip int) Target {
|
||||||
return Target{
|
return Target{
|
||||||
Target: fmt.Sprintf("%s:%d", host, ip),
|
Target: fmt.Sprintf("%s:%d", host, ip),
|
||||||
|
@ -50,7 +50,7 @@ func init() {
|
|||||||
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
|
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
|
||||||
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
|
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
|
||||||
watchConfigNames []string, watchServiceNames []string,
|
watchConfigNames []string, watchServiceNames []string,
|
||||||
rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error,
|
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
|
||||||
options ...grpc.ServerOption) error {
|
options ...grpc.ServerOption) error {
|
||||||
|
|
||||||
if notification != nil {
|
if notification != nil {
|
||||||
@ -148,11 +148,13 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
|
if autoSetPorts {
|
||||||
if !errors.Is(err, discovery.ErrNotSupportedKeyValue) {
|
if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil {
|
||||||
|
if !errors.Is(err, discovery.ErrNotSupported) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
err := prommetrics.Start(prometheusListener)
|
err := prommetrics.Start(prometheusListener)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -26,7 +26,7 @@ func TestName111111(t *testing.T) {
|
|||||||
"172.16.8.124:7005",
|
"172.16.8.124:7005",
|
||||||
"172.16.8.124:7006",
|
"172.16.8.124:7006",
|
||||||
},
|
},
|
||||||
ClusterMode: true,
|
RedisMode: "cluster",
|
||||||
Password: "passwd123",
|
Password: "passwd123",
|
||||||
//Address: []string{"localhost:16379"},
|
//Address: []string{"localhost:16379"},
|
||||||
//Password: "openIM123",
|
//Password: "openIM123",
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
serviceBinaries:
|
serviceBinaries:
|
||||||
openim-api: 1
|
openim-api: 1
|
||||||
openim-crontask: 1
|
openim-crontask: 4
|
||||||
openim-rpc-user: 1
|
openim-rpc-user: 1
|
||||||
openim-msggateway: 1
|
openim-msggateway: 1
|
||||||
openim-push: 8
|
openim-push: 8
|
||||||
|
@ -28,6 +28,8 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const StructTagName = "yaml"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MaxSeq = "MAX_SEQ:"
|
MaxSeq = "MAX_SEQ:"
|
||||||
MinSeq = "MIN_SEQ:"
|
MinSeq = "MIN_SEQ:"
|
||||||
@ -54,13 +56,14 @@ func readConfig[T any](dir string, name string) (*T, error) {
|
|||||||
if err := v.ReadInConfig(); err != nil {
|
if err := v.ReadInConfig(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
fn := func(config *mapstructure.DecoderConfig) {
|
|
||||||
config.TagName = "mapstructure"
|
|
||||||
}
|
|
||||||
var conf T
|
var conf T
|
||||||
if err := v.Unmarshal(&conf, fn); err != nil {
|
if err := v.Unmarshal(&conf, func(config *mapstructure.DecoderConfig) {
|
||||||
|
config.TagName = StructTagName
|
||||||
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &conf, nil
|
return &conf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,6 +72,7 @@ func Main(conf string, del time.Duration) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName)
|
mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user