merge: main

This commit is contained in:
icey-yu 2025-06-20 15:30:23 +08:00
commit 6a7f6fa8c7
50 changed files with 1058 additions and 315 deletions

2
.env
View File

@ -2,7 +2,7 @@ MONGO_IMAGE=mongo:7.0
REDIS_IMAGE=redis:7.0.0 REDIS_IMAGE=redis:7.0.0
KAFKA_IMAGE=bitnami/kafka:3.5.1 KAFKA_IMAGE=bitnami/kafka:3.5.1
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z
ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13 ETCD_IMAGE=bitnami/etcd:3.5.13
PROMETHEUS_IMAGE=prom/prometheus:v2.45.6 PROMETHEUS_IMAGE=prom/prometheus:v2.45.6
ALERTMANAGER_IMAGE=prom/alertmanager:v0.27.0 ALERTMANAGER_IMAGE=prom/alertmanager:v0.27.0
GRAFANA_IMAGE=grafana/grafana:11.0.1 GRAFANA_IMAGE=grafana/grafana:11.0.1

View File

@ -155,6 +155,17 @@ jobs:
'{title: $title, head: $head, base: $base, body: $body}')") '{title: $title, head: $head, base: $base, body: $body}')")
new_pr_number=$(echo "$response" | jq -r '.number') new_pr_number=$(echo "$response" | jq -r '.number')
if [[ "$new_pr_number" == "null" || -z "$new_pr_number" ]]; then
echo "Failed to create PR. Response: $response"
git checkout $TARGET_BRANCH
git branch -D $cherry_pick_branch
echo "Deleted branch: $cherry_pick_branch"
git push origin --delete $cherry_pick_branch
else
echo "Created PR #$new_pr_number" echo "Created PR #$new_pr_number"
curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \
@ -162,4 +173,9 @@ jobs:
-d '{"labels": ["milestone-merge"]}' \ -d '{"labels": ["milestone-merge"]}' \
"https://api.github.com/repos/${{ github.repository }}/issues/$new_pr_number/labels" "https://api.github.com/repos/${{ github.repository }}/issues/$new_pr_number/labels"
fi fi
echo ""
echo "----------------------------------------"
echo ""
fi
done done

View File

@ -3,7 +3,6 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"net" "net"
@ -39,7 +38,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
"github.com/spf13/viper" "github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error {
return err return err
} }
} }
ip, err := network.GetLocalIP()
if err != nil {
return err
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil { if err != nil {
return fmt.Errorf("prometheus listen %d error %w", port, err) return fmt.Errorf("prometheus listen %d error %w", port, err)
} }
defer listener.Close() defer listener.Close()
log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) log.ZDebug(ctx, "prometheus start", "addr", listener.Addr())
target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port))
if err != nil {
return err
}
if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
return err
}
go func() { go func() {
err := prommetrics.Start(listener) err := prommetrics.Start(listener)
if err == nil { if err == nil {
@ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error {
} }
} }
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) {
name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
if index := strings.Index(name, "."); index >= 0 { if index := strings.Index(name, "."); index >= 0 {
name = name[:index] name = name[:index]
@ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C
if err := cmd.parseConf(&conf); err != nil { if err := cmd.parseConf(&conf); err != nil {
return err return err
} }
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar())
}) })
} }

View File

@ -2,8 +2,10 @@ enable: etcd
etcd: etcd:
rootDirectory: openim rootDirectory: openim
address: [localhost:12379] address: [localhost:12379]
username: '' ## Attention: If you set auth in etcd
password: '' ## you must also update the username and password in Chat project.
username:
password:
kubernetes: kubernetes:
namespace: default namespace: default

View File

@ -1,7 +1,7 @@
# Username for authentication ## Kafka authentication
username: '' username:
# Password for authentication password:
password: ''
# Producer acknowledgment settings # Producer acknowledgment settings
producerAck: producerAck:
# Compression type to use (e.g., none, gzip, snappy) # Compression type to use (e.g., none, gzip, snappy)

View File

@ -1,7 +1,14 @@
address: [localhost:16379] address: [localhost:16379]
username: username:
password: openIM123 password: openIM123
clusterMode: false # redis Mode, including "standalone","cluster","sentinel"
redisMode: "standalone"
db: 0 db: 0
maxRetry: 10 maxRetry: 10
poolSize: 100 poolSize: 100
# Sentinel configuration (only used when redisMode is "sentinel")
sentinelMode:
masterName: "redis-master"
sentinelsAddrs: ["127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"]
routeByLatency: true
routeRandomly: true

View File

@ -16,7 +16,7 @@ afterUpdateUserInfoEx:
afterSendSingleMsg: afterSendSingleMsg:
enable: false enable: false
timeout: 5 timeout: 5
# Only the recvID specified in attentionIds will send the callback # Only the recvIDs specified in attentionIds will send the callback
# if not set, all user messages will be callback # if not set, all user messages will be callback
attentionIds: [] attentionIds: []
# See beforeSendSingleMsg comment. # See beforeSendSingleMsg comment.
@ -36,7 +36,7 @@ beforeMsgModify:
afterSendGroupMsg: afterSendGroupMsg:
enable: false enable: false
timeout: 5 timeout: 5
# Only the recvID specified in attentionIds will send the callback # Only the GroupIDs specified in attentionIds will send the callback
# if not set, all user messages will be callback # if not set, all user messages will be callback
attentionIds: [] attentionIds: []
# See beforeSendSingleMsg comment. # See beforeSendSingleMsg comment.

View File

@ -83,8 +83,83 @@ services:
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380 - ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
- ETCD_INITIAL_CLUSTER_TOKEN=tkn - ETCD_INITIAL_CLUSTER_TOKEN=tkn
- ETCD_INITIAL_CLUSTER_STATE=new - ETCD_INITIAL_CLUSTER_STATE=new
- ALLOW_NONE_AUTHENTICATION=no
## Optional: Enable etcd authentication by setting the following credentials
# - ETCD_ROOT_USER=root
# - ETCD_ROOT_PASSWORD=openIM123
# - ETCD_USERNAME=openIM
# - ETCD_PASSWORD=openIM123
volumes: volumes:
- "${DATA_DIR}/components/etcd:/etcd-data" - "${DATA_DIR}/components/etcd:/etcd-data"
command: >
/bin/sh -c '
etcd &
export ETCDCTL_API=3
echo "Waiting for etcd to become healthy..."
until etcdctl --endpoints=http://127.0.0.1:2379 endpoint health &>/dev/null; do
echo "Waiting for ETCD to start..."
sleep 1
done
echo "etcd is healthy."
if [ -n "$${ETCD_ROOT_USER}" ] && [ -n "$${ETCD_ROOT_PASSWORD}" ] && [ -n "$${ETCD_USERNAME}" ] && [ -n "$${ETCD_PASSWORD}" ]; then
echo "Authentication credentials provided. Setting up authentication..."
echo "Checking authentication status..."
if ! etcdctl --endpoints=http://127.0.0.1:2379 auth status | grep -q "Authentication Status: true"; then
echo "Authentication is disabled. Creating users and enabling..."
# Create users and setup permissions
etcdctl --endpoints=http://127.0.0.1:2379 user add $${ETCD_ROOT_USER} --new-user-password=$${ETCD_ROOT_PASSWORD} || true
etcdctl --endpoints=http://127.0.0.1:2379 user add $${ETCD_USERNAME} --new-user-password=$${ETCD_PASSWORD} || true
etcdctl --endpoints=http://127.0.0.1:2379 role add openim-role || true
etcdctl --endpoints=http://127.0.0.1:2379 role grant-permission openim-role --prefix=true readwrite / || true
etcdctl --endpoints=http://127.0.0.1:2379 role grant-permission openim-role --prefix=true readwrite "" || true
etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_USERNAME} openim-role || true
etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_ROOT_USER} $${ETCD_USERNAME} root || true
echo "Enabling authentication..."
etcdctl --endpoints=http://127.0.0.1:2379 auth enable
echo "Authentication enabled successfully"
else
echo "Authentication is already enabled. Checking OpenIM user..."
# Check if openIM user exists and can perform operations
if ! etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} put /test/auth "auth-check" &>/dev/null; then
echo "OpenIM user test failed. Recreating user with root credentials..."
# Try to create/update the openIM user using root credentials
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} user add $${ETCD_USERNAME} --new-user-password=$${ETCD_PASSWORD} --no-password-file || true
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role add openim-role || true
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role grant-permission openim-role --prefix=true readwrite / || true
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role grant-permission openim-role --prefix=true readwrite "" || true
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} user grant-role $${ETCD_USERNAME} openim-role || true
etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_ROOT_USER} $${ETCD_USERNAME} root || true
echo "OpenIM user recreated with required permissions"
else
echo "OpenIM user exists and has correct permissions"
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} del /test/auth &>/dev/null
fi
fi
echo "Testing authentication with OpenIM user..."
if etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} put /test/auth "auth-works"; then
echo "Authentication working properly"
etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} del /test/auth
else
echo "WARNING: Authentication test failed"
fi
else
echo "No authentication credentials provided. Running in no-auth mode."
echo "To enable authentication, set ETCD_ROOT_USER, ETCD_ROOT_PASSWORD, ETCD_USERNAME, and ETCD_PASSWORD environment variables."
fi
tail -f /dev/null
'
restart: always restart: always
networks: networks:
- openim - openim
@ -104,12 +179,38 @@ services:
KAFKA_CFG_NODE_ID: 0 KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_NUM_PARTITIONS: 8 KAFKA_NUM_PARTITIONS: 8
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
# Authentication configuration variables - comment out to disable auth
# KAFKA_USERNAME: "openIM"
# KAFKA_PASSWORD: "openIM123"
command: >
/bin/sh -c '
if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then
echo "=== Kafka SASL Authentication ENABLED ==="
echo "Username: $${KAFKA_USERNAME}"
# Set environment variables for SASL authentication
export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT"
export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}"
export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}"
fi
# Start Kafka with the configured environment
exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
'
networks: networks:
- openim - openim
@ -161,9 +262,9 @@ services:
- ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml
- ${DATA_DIR}/components/prometheus/data:/prometheus - ${DATA_DIR}/components/prometheus/data:/prometheus
command: command:
- '--config.file=/etc/prometheus/prometheus.yml' - "--config.file=/etc/prometheus/prometheus.yml"
- '--storage.tsdb.path=/prometheus' - "--storage.tsdb.path=/prometheus"
- '--web.listen-address=:${PROMETHEUS_PORT}' - "--web.listen-address=:${PROMETHEUS_PORT}"
network_mode: host network_mode: host
alertmanager: alertmanager:
@ -176,8 +277,8 @@ services:
- ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml
- ./config/email.tmpl:/etc/alertmanager/email.tmpl - ./config/email.tmpl:/etc/alertmanager/email.tmpl
command: command:
- '--config.file=/etc/alertmanager/alertmanager.yml' - "--config.file=/etc/alertmanager/alertmanager.yml"
- '--web.listen-address=:${ALERTMANAGER_PORT}' - "--web.listen-address=:${ALERTMANAGER_PORT}"
network_mode: host network_mode: host
grafana: grafana:
@ -209,9 +310,8 @@ services:
- /sys:/host/sys:ro - /sys:/host/sys:ro
- /:/rootfs:ro - /:/rootfs:ro
command: command:
- '--path.procfs=/host/proc' - "--path.procfs=/host/proc"
- '--path.sysfs=/host/sys' - "--path.sysfs=/host/sys"
- '--path.rootfs=/rootfs' - "--path.rootfs=/rootfs"
- '--web.listen-address=:19100' - "--web.listen-address=:19100"
network_mode: host network_mode: host

4
go.mod
View File

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.6 github.com/openimsdk/protocol v0.0.73-alpha.12
github.com/openimsdk/tools v0.0.50-alpha.89 github.com/openimsdk/tools v0.0.50-alpha.92
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0

8
go.sum
View File

@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5beV3ZyOsGhY= github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5beV3ZyOsGhY=
github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.6 h1:sna9coWG7HN1zObBPtvG0Ki/vzqHXiB4qKbA5P3w7kc= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
github.com/openimsdk/protocol v0.0.73-alpha.6/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.89 h1:aAbWSc3gOI//+KQ70i7ilOTiLqQNotmp+bobg4Gu8qI= github.com/openimsdk/tools v0.0.50-alpha.92 h1:hWfykMhmi7EQEiwgQccJqbgggIuhun/PrVkBnjmj9Ec=
github.com/openimsdk/tools v0.0.50-alpha.89/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/openimsdk/tools v0.0.50-alpha.92/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -114,3 +114,7 @@ func (o *FriendApi) GetIncrementalBlacks(c *gin.Context) {
func (o *FriendApi) GetFullFriendUserIDs(c *gin.Context) { func (o *FriendApi) GetFullFriendUserIDs(c *gin.Context) {
a2r.Call(c, relation.FriendClient.GetFullFriendUserIDs, o.Client) a2r.Call(c, relation.FriendClient.GetFullFriendUserIDs, o.Client)
} }
func (o *FriendApi) GetSelfUnhandledApplyCount(c *gin.Context) {
a2r.Call(c, relation.FriendClient.GetSelfUnhandledApplyCount, o.Client)
}

View File

@ -165,3 +165,7 @@ func (o *GroupApi) GetFullGroupMemberUserIDs(c *gin.Context) {
func (o *GroupApi) GetFullJoinGroupIDs(c *gin.Context) { func (o *GroupApi) GetFullJoinGroupIDs(c *gin.Context) {
a2r.Call(c, group.GroupClient.GetFullJoinGroupIDs, o.Client) a2r.Call(c, group.GroupClient.GetFullJoinGroupIDs, o.Client)
} }
func (o *GroupApi) GetGroupApplicationUnhandledCount(c *gin.Context) {
a2r.Call(c, group.GroupClient.GetGroupApplicationUnhandledCount, o.Client)
}

View File

@ -39,7 +39,7 @@ type Config struct {
Index conf.Index Index conf.Index
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
if err != nil { if err != nil {
return err return err

View File

@ -467,6 +467,10 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) {
sessionType int32 sessionType int32
recvID string recvID string
) )
if err = c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
err = json.Unmarshal(decodedData, &keyMsgData) err = json.Unmarshal(decodedData, &keyMsgData)
if err != nil { if err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
@ -490,6 +494,11 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) {
return return
} }
content, err := jsonutil.JsonMarshal(apistruct.MarkdownTextElem{Content: req.Content})
if err != nil {
apiresp.GinError(c, errs.Wrap(err))
return
}
msgData := &sdkws.MsgData{ msgData := &sdkws.MsgData{
SendID: sendID, SendID: sendID,
RecvID: recvID, RecvID: recvID,
@ -498,17 +507,17 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) {
SenderPlatformID: constant.AdminPlatformID, SenderPlatformID: constant.AdminPlatformID,
SessionType: sessionType, SessionType: sessionType,
MsgFrom: constant.UserMsgType, MsgFrom: constant.UserMsgType,
ContentType: constant.Text, ContentType: constant.MarkdownText,
Content: []byte(req.Content), Content: content,
OfflinePushInfo: req.OfflinePushInfo, OfflinePushInfo: req.OfflinePushInfo,
Ex: req.Ex, Ex: req.Ex,
} }
sendReq := &msg.SendMsgReq{ sendReq := &msg.SendSimpleMsgReq{
MsgData: msgData, MsgData: msgData,
} }
respPb, err := m.Client.SendMsg(c, sendReq) respPb, err := m.Client.SendSimpleMsg(c, sendReq)
if err != nil { if err != nil {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
@ -525,7 +534,12 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) {
return return
} }
m.ginRespSendMsg(c, sendReq, respPb) m.ginRespSendMsg(c, &msg.SendMsgReq{MsgData: sendReq.MsgData}, &msg.SendMsgResp{
ServerMsgID: respPb.ServerMsgID,
ClientMsgID: respPb.ClientMsgID,
SendTime: respPb.SendTime,
Modify: respPb.Modify,
})
} }
func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) {

View File

@ -6,35 +6,29 @@ import (
"net/http" "net/http"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
clientv3 "go.etcd.io/etcd/client/v3"
) )
type PrometheusDiscoveryApi struct { type PrometheusDiscoveryApi struct {
config *Config config *Config
client *clientv3.Client
kv discovery.KeyValue kv discovery.KeyValue
} }
func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi {
api := &PrometheusDiscoveryApi{ api := &PrometheusDiscoveryApi{
config: config, config: config,
} kv: client,
if config.Discovery.Enable == conf.ETCD {
api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
} }
return api return api
} }
func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key)) value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key))
if err != nil { if err != nil {
if errors.Is(err, discovery.ErrNotSupportedKeyValue) { if errors.Is(err, discovery.ErrNotSupported) {
c.JSON(http.StatusOK, []struct{}{}) c.JSON(http.StatusOK, []struct{}{})
return return
} }
@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
return return
} }
var resp prommetrics.RespTarget var resp prommetrics.RespTarget
if err := json.Unmarshal(value, &resp); err != nil { for i := range value {
var tmp prommetrics.Target
if err = json.Unmarshal(value[i], &tmp); err != nil {
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
return return
} }
resp.Targets = append(resp.Targets, tmp.Target)
resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget
}
c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp})
} }

View File

@ -28,6 +28,7 @@ import (
"github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/mw/api"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )
@ -53,7 +54,7 @@ func prommetricsGin() gin.HandlerFunc {
} }
} }
func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) { func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) {
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
if err != nil { if err != nil {
return nil, err return nil, err
@ -96,7 +97,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
case BestSpeed: case BestSpeed:
r.Use(gzip.Gzip(gzip.BestSpeed)) r.Use(gzip.Gzip(gzip.BestSpeed))
} }
r.Use(prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), r.Use(api.GinLogger(), prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(),
mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn)), setGinIsAdmin(cfg.Share.IMAdminUserID)) mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn)), setGinIsAdmin(cfg.Share.IMAdminUserID))
u := NewUserApi(user.NewUserClient(userConn), client, cfg.Discovery.RpcService) u := NewUserApi(user.NewUserClient(userConn), client, cfg.Discovery.RpcService)
@ -156,6 +157,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
friendRouterGroup.POST("/update_friends", f.UpdateFriends) friendRouterGroup.POST("/update_friends", f.UpdateFriends)
friendRouterGroup.POST("/get_incremental_friends", f.GetIncrementalFriends) friendRouterGroup.POST("/get_incremental_friends", f.GetIncrementalFriends)
friendRouterGroup.POST("/get_full_friend_user_ids", f.GetFullFriendUserIDs) friendRouterGroup.POST("/get_full_friend_user_ids", f.GetFullFriendUserIDs)
friendRouterGroup.POST("/get_self_unhandled_apply_count", f.GetSelfUnhandledApplyCount)
} }
g := NewGroupApi(group.NewGroupClient(groupConn)) g := NewGroupApi(group.NewGroupClient(groupConn))
@ -192,6 +194,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
groupRouterGroup.POST("/get_incremental_group_members_batch", g.GetIncrementalGroupMemberBatch) groupRouterGroup.POST("/get_incremental_group_members_batch", g.GetIncrementalGroupMemberBatch)
groupRouterGroup.POST("/get_full_group_member_user_ids", g.GetFullGroupMemberUserIDs) groupRouterGroup.POST("/get_full_group_member_user_ids", g.GetFullGroupMemberUserIDs)
groupRouterGroup.POST("/get_full_join_group_ids", g.GetFullJoinGroupIDs) groupRouterGroup.POST("/get_full_join_group_ids", g.GetFullJoinGroupIDs)
groupRouterGroup.POST("/get_group_application_unhandled_count", g.GetGroupApplicationUnhandledCount)
} }
// certificate // certificate
{ {
@ -250,6 +253,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
msgGroup.POST("/delete_msg_physical", m.DeleteMsgPhysical) msgGroup.POST("/delete_msg_physical", m.DeleteMsgPhysical)
msgGroup.POST("/batch_send_msg", m.BatchSendMsg) msgGroup.POST("/batch_send_msg", m.BatchSendMsg)
msgGroup.POST("/send_simple_msg", m.SendSimpleMessage)
msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess)
msgGroup.POST("/get_server_time", m.GetServerTime) msgGroup.POST("/get_server_time", m.GetServerTime)
} }

View File

@ -39,7 +39,7 @@ type Config struct {
} }
// Start run ws server. // Start run ws server.
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
"rpcPorts", conf.MsgGateway.RPC.Ports, "rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)

View File

@ -0,0 +1,132 @@
package msgtransfer
import (
"context"
"encoding/base64"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
)
func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq {
return cbapi.CommonCallbackReq{
SendID: msg.SendID,
ServerMsgID: msg.ServerMsgID,
CallbackCommand: command,
ClientMsgID: msg.ClientMsgID,
OperationID: mcontext.GetOperationID(ctx),
SenderPlatformID: msg.SenderPlatformID,
SenderNickname: msg.SenderNickname,
SessionType: msg.SessionType,
MsgFrom: msg.MsgFrom,
ContentType: msg.ContentType,
Status: msg.Status,
SendTime: msg.SendTime,
CreateTime: msg.CreateTime,
AtUserIDList: msg.AtUserIDList,
SenderFaceURL: msg.SenderFaceURL,
Content: GetContent(msg),
Seq: uint32(msg.Seq),
Ex: msg.Ex,
}
}
func GetContent(msg *sdkws.MsgData) string {
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
var tips sdkws.TipsComm
_ = proto.Unmarshal(msg.Content, &tips)
content := tips.JsonDetail
return content
} else {
return string(msg.Content)
}
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
}
if !filterAfterMsg(msg, after) {
return
}
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
RecvID: msg.RecvID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
}
if !filterAfterMsg(msg, after) {
return
}
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
GroupID: msg.GroupID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
}
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
keyMsgData := apistruct.KeyMsgData{
SendID: msg.SendID,
RecvID: msg.RecvID,
GroupID: msg.GroupID,
}
return map[string]string{
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
}
}
func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool {
return filterMsg(msg, after.AttentionIds, after.DeniedTypes)
}
func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool {
// According to the attentionIds configuration, only some users are sent
if len(attentionIds) != 0 && msg.ContentType == constant.SingleChatType && !datautil.Contain(msg.RecvID, attentionIds...) {
return false
}
if len(attentionIds) != 0 && msg.ContentType == constant.ReadGroupChatType && !datautil.Contain(msg.GroupID, attentionIds...) {
return false
}
if defaultDeniedTypes(msg.ContentType) {
return false
}
if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) {
return false
}
return true
}
func defaultDeniedTypes(contentType int32) bool {
if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd {
return true
}
if contentType == constant.Typing {
return true
}
return false
}

View File

@ -58,7 +58,7 @@ type Config struct {
Index conf.Index Index conf.Index
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
builder := mqbuild.NewBuilder(&config.KafkaConfig) builder := mqbuild.NewBuilder(&config.KafkaConfig)
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil { if err != nil {
return err return err
} }
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase) historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config)
msgTransfer := &MsgTransfer{ msgTransfer := &MsgTransfer{
historyConsumer: historyConsumer, historyConsumer: historyConsumer,

View File

@ -19,6 +19,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg" pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -26,11 +28,15 @@ import (
type OnlineHistoryMongoConsumerHandler struct { type OnlineHistoryMongoConsumerHandler struct {
msgTransferDatabase controller.MsgTransferDatabase msgTransferDatabase controller.MsgTransferDatabase
config *Config
webhookClient *webhook.Client
} }
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler { func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler {
return &OnlineHistoryMongoConsumerHandler{ return &OnlineHistoryMongoConsumerHandler{
msgTransferDatabase: database, msgTransferDatabase: database,
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
} }
} }
@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
} else { } else {
prommetrics.MsgInsertMongoSuccessCounter.Inc() prommetrics.MsgInsertMongoSuccessCounter.Inc()
} }
for _, msgData := range msgFromMQ.MsgData {
switch msgData.SessionType {
case constant.SingleChatType:
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
case constant.ReadGroupChatType:
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
}
}
//var seqs []int64 //var seqs []int64
//for _, msg := range msgFromMQ.MsgData { //for _, msg := range msgFromMQ.MsgData {
// seqs = append(seqs, msg.Seq) // seqs = append(seqs, msg.Seq)

View File

@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache"
@ -49,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
return &pbpush.DelUserPushTokenResp{}, nil return &pbpush.DelUserPushTokenResp{}, nil
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
rdb, err := dbb.Redis(ctx) rdb, err := dbb.Redis(ctx)
if err != nil { if err != nil {
@ -106,7 +107,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
go func() { go func() {
pushHandler.WaitCache() pushHandler.WaitCache()
fn := func(ctx context.Context, key string, value []byte) error { fn := func(ctx context.Context, key string, value []byte) error {
pushHandler.HandleMs2PsChat(ctx, value) pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(ctx), value)
return nil return nil
} }
consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32())))

View File

@ -59,7 +59,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
rdb, err := dbb.Redis(ctx) rdb, err := dbb.Redis(ctx)
if err != nil { if err != nil {

View File

@ -69,7 +69,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx) mgocli, err := dbb.Mongo(ctx)
if err != nil { if err != nil {
@ -432,6 +432,9 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversation.ConversationID, req.UserIDs, 0); err != nil {
return nil, err
}
c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation) c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation)
return &pbconversation.CreateGroupChatConversationsResp{}, nil return &pbconversation.CreateGroupChatConversationsResp{}, nil

View File

@ -17,7 +17,6 @@ package group
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbgroup "github.com/openimsdk/protocol/group" pbgroup "github.com/openimsdk/protocol/group"
) )
@ -34,7 +33,7 @@ func (g *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGro
} }
func (g *groupServer) GetGroupMemberCache(ctx context.Context, req *pbgroup.GetGroupMemberCacheReq) (*pbgroup.GetGroupMemberCacheResp, error) { func (g *groupServer) GetGroupMemberCache(ctx context.Context, req *pbgroup.GetGroupMemberCacheReq) (*pbgroup.GetGroupMemberCacheResp, error) {
if err := authverify.CheckAccess(ctx, req.GroupMemberID); err != nil { if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err return nil, err
} }
members, err := g.db.TakeGroupMember(ctx, req.GroupID, req.GroupMemberID) members, err := g.db.TakeGroupMember(ctx, req.GroupID, req.GroupMemberID)

View File

@ -25,7 +25,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -77,7 +76,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx) mgocli, err := dbb.Mongo(ctx)
if err != nil { if err != nil {
@ -153,10 +152,14 @@ func (g *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro
func (g *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { func (g *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error {
if !authverify.IsAdmin(ctx) { if !authverify.IsAdmin(ctx) {
groupMember, err := g.db.TakeGroupMember(ctx, groupID, mcontext.GetOpUserID(ctx)) members, err := g.db.FindGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)})
if err != nil { if err != nil {
return err return err
} }
if len(members) == 0 {
return errs.ErrNoPermission.WrapMsg("op user not in group")
}
groupMember := members[0]
if !(groupMember.RoleLevel == constant.GroupOwner || groupMember.RoleLevel == constant.GroupAdmin) { if !(groupMember.RoleLevel == constant.GroupOwner || groupMember.RoleLevel == constant.GroupAdmin) {
return errs.ErrNoPermission.WrapMsg("no group owner or admin") return errs.ErrNoPermission.WrapMsg("no group owner or admin")
} }
@ -369,6 +372,10 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed") return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed")
} }
if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
userMap, err := g.userClient.GetUsersInfoMap(ctx, req.InvitedUserIDs) userMap, err := g.userClient.GetUsersInfoMap(ctx, req.InvitedUserIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -379,9 +386,9 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
} }
var groupMember *model.GroupMember var groupMember *model.GroupMember
var opUserID string opUserID := mcontext.GetOpUserID(ctx)
if !authverify.IsAdmin(ctx) { if !authverify.IsAdmin(ctx) {
opUserID = mcontext.GetOpUserID(ctx)
var err error var err error
groupMember, err = g.db.TakeGroupMember(ctx, req.GroupID, opUserID) groupMember, err = g.db.TakeGroupMember(ctx, req.GroupID, opUserID)
if err != nil { if err != nil {
@ -390,8 +397,6 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
if err := g.PopulateGroupMember(ctx, groupMember); err != nil { if err := g.PopulateGroupMember(ctx, groupMember); err != nil {
return nil, err return nil, err
} }
} else {
opUserID = mcontext.GetOpUserID(ctx)
} }
if err := g.webhookBeforeInviteUserToGroup(ctx, &g.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue { if err := g.webhookBeforeInviteUserToGroup(ctx, &g.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue {
@ -421,7 +426,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
ReqMessage: request.ReqMsg, ReqMessage: request.ReqMsg,
JoinSource: request.JoinSource, JoinSource: request.JoinSource,
InviterUserID: request.InviterUserID, InviterUserID: request.InviterUserID,
}) }, request)
} }
return &pbgroup.InviteUserToGroupResp{}, nil return &pbgroup.InviteUserToGroupResp{}, nil
} }
@ -450,10 +455,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
const singleQuantity = 50 const singleQuantity = 50
for start := 0; start < len(groupMembers); start += singleQuantity { for start := 0; start < len(groupMembers); start += singleQuantity {
end := start + singleQuantity end := min(start+singleQuantity, len(groupMembers))
if end > len(groupMembers) {
end = len(groupMembers)
}
currentMembers := groupMembers[start:end] currentMembers := groupMembers[start:end]
if err := g.db.CreateGroup(ctx, nil, currentMembers); err != nil { if err := g.db.CreateGroup(ctx, nil, currentMembers); err != nil {
@ -464,8 +466,8 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
return e.UserID return e.UserID
}) })
if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, userIDs...); err != nil { if len(userIDs) != 0 {
return nil, err g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, userIDs...)
} }
} }
return &pbgroup.InviteUserToGroupResp{}, nil return &pbgroup.InviteUserToGroupResp{}, nil
@ -690,15 +692,34 @@ func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.
if err := authverify.CheckAccess(ctx, req.FromUserID); err != nil { if err := authverify.CheckAccess(ctx, req.FromUserID); err != nil {
return nil, err return nil, err
} }
groupIDs, err := g.db.FindUserManagedGroupID(ctx, req.FromUserID) var (
groupIDs []string
err error
)
if len(req.GroupIDs) == 0 {
groupIDs, err = g.db.FindUserManagedGroupID(ctx, req.FromUserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else {
req.GroupIDs = datautil.Distinct(req.GroupIDs)
if !authverify.IsAdmin(ctx) {
for _, groupID := range req.GroupIDs {
if err := g.CheckGroupAdmin(ctx, groupID); err != nil {
return nil, err
}
}
}
groupIDs = req.GroupIDs
}
resp := &pbgroup.GetGroupApplicationListResp{} resp := &pbgroup.GetGroupApplicationListResp{}
if len(groupIDs) == 0 { if len(groupIDs) == 0 {
return resp, nil return resp, nil
} }
total, groupRequests, err := g.db.PageGroupRequest(ctx, groupIDs, req.Pagination) handleResults := datautil.Slice(req.HandleResults, func(e int32) int {
return int(e)
})
total, groupRequests, err := g.db.PageGroupRequest(ctx, groupIDs, handleResults, req.Pagination)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -763,6 +784,23 @@ func (g *groupServer) GetGroupsInfo(ctx context.Context, req *pbgroup.GetGroupsI
}, nil }, nil
} }
func (g *groupServer) GetGroupApplicationUnhandledCount(ctx context.Context, req *pbgroup.GetGroupApplicationUnhandledCountReq) (*pbgroup.GetGroupApplicationUnhandledCountResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
groupIDs, err := g.db.FindUserManagedGroupID(ctx, req.UserID)
if err != nil {
return nil, err
}
count, err := g.db.GetGroupApplicationUnhandledCount(ctx, groupIDs, req.Time)
if err != nil {
return nil, err
}
return &pbgroup.GetGroupApplicationUnhandledCountResp{
Count: count,
}, nil
}
func (g *groupServer) getGroupsInfo(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { func (g *groupServer) getGroupsInfo(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) {
if len(groupIDs) == 0 { if len(groupIDs) == 0 {
return nil, nil return nil, nil
@ -944,7 +982,7 @@ func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
if err = g.db.CreateGroupRequest(ctx, []*model.GroupRequest{&groupRequest}); err != nil { if err = g.db.CreateGroupRequest(ctx, []*model.GroupRequest{&groupRequest}); err != nil {
return nil, err return nil, err
} }
g.notification.JoinGroupApplicationNotification(ctx, req) g.notification.JoinGroupApplicationNotification(ctx, req, &groupRequest)
return &pbgroup.JoinGroupResp{}, nil return &pbgroup.JoinGroupResp{}, nil
} }
@ -1308,6 +1346,9 @@ func (g *groupServer) GetGroups(ctx context.Context, req *pbgroup.GetGroupsReq)
} }
func (g *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGroupMembersCMSReq) (*pbgroup.GetGroupMembersCMSResp, error) { func (g *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGroupMembersCMSReq) (*pbgroup.GetGroupMembersCMSResp, error) {
if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
total, members, err := g.db.SearchGroupMember(ctx, req.UserName, req.GroupID, req.Pagination) total, members, err := g.db.SearchGroupMember(ctx, req.UserName, req.GroupID, req.Pagination)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1324,11 +1365,17 @@ func (g *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGr
} }
func (g *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgroup.GetUserReqApplicationListReq) (*pbgroup.GetUserReqApplicationListResp, error) { func (g *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgroup.GetUserReqApplicationListReq) (*pbgroup.GetUserReqApplicationListResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
user, err := g.userClient.GetUserInfo(ctx, req.UserID) user, err := g.userClient.GetUserInfo(ctx, req.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
total, requests, err := g.db.PageGroupRequestUser(ctx, req.UserID, req.Pagination) handleResults := datautil.Slice(req.HandleResults, func(e int32) int {
return int(e)
})
total, requests, err := g.db.PageGroupRequestUser(ctx, req.UserID, req.GroupIDs, handleResults, req.Pagination)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1717,6 +1764,9 @@ func (g *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbgroup.Ge
if len(req.GroupIDs) == 0 { if len(req.GroupIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("groupIDs empty") return nil, errs.ErrArgs.WrapMsg("groupIDs empty")
} }
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
members, err := g.db.FindGroupMemberUser(ctx, req.GroupIDs, req.UserID) members, err := g.db.FindGroupMemberUser(ctx, req.GroupIDs, req.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1748,6 +1798,9 @@ func (g *groupServer) GetGroupMemberRoleLevel(ctx context.Context, req *pbgroup.
if len(req.RoleLevels) == 0 { if len(req.RoleLevels) == 0 {
return nil, errs.ErrArgs.WrapMsg("RoleLevels empty") return nil, errs.ErrArgs.WrapMsg("RoleLevels empty")
} }
if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
members, err := g.db.FindGroupMemberRoleLevels(ctx, req.GroupID, req.RoleLevels) members, err := g.db.FindGroupMemberRoleLevels(ctx, req.GroupID, req.RoleLevels)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1763,6 +1816,9 @@ func (g *groupServer) GetGroupMemberRoleLevel(ctx context.Context, req *pbgroup.
} }
func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *pbgroup.GetGroupUsersReqApplicationListReq) (*pbgroup.GetGroupUsersReqApplicationListResp, error) { func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *pbgroup.GetGroupUsersReqApplicationListReq) (*pbgroup.GetGroupUsersReqApplicationListResp, error) {
if err := g.CheckGroupAdmin(ctx, req.GroupID); err != nil {
return nil, err
}
requests, err := g.db.FindGroupRequests(ctx, req.GroupID, req.UserIDs) requests, err := g.db.FindGroupRequests(ctx, req.GroupID, req.UserIDs)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
@ -365,13 +366,46 @@ func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Co
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, notification.WithRpcGetUserName(), notification.WithSendMessage(sendMessage)) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, notification.WithRpcGetUserName(), notification.WithSendMessage(sendMessage))
} }
func (g *NotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) { func (g *NotificationSender) uuid() string {
return uuid.New().String()
}
func (g *NotificationSender) getGroupRequest(ctx context.Context, groupID string, userID string) (*sdkws.GroupRequest, error) {
request, err := g.db.TakeGroupRequest(ctx, groupID, userID)
if err != nil {
return nil, err
}
users, err := g.getUsersInfo(ctx, []string{userID})
if err != nil {
return nil, err
}
if len(users) == 0 {
return nil, servererrs.ErrUserIDNotFound.WrapMsg(fmt.Sprintf("user %s not found", userID))
}
info, ok := users[0].(*sdkws.UserInfo)
if !ok {
info = &sdkws.UserInfo{
UserID: users[0].GetUserID(),
Nickname: users[0].GetNickname(),
FaceURL: users[0].GetFaceURL(),
Ex: users[0].GetEx(),
}
}
return convert.Db2PbGroupRequest(request, info, nil), nil
}
func (g *NotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq, dbReq *model.GroupRequest) {
var err error var err error
defer func() { defer func() {
if err != nil { if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
} }
}() }()
request, err := g.getGroupRequest(ctx, dbReq.GroupID, dbReq.UserID)
if err != nil {
log.ZError(ctx, "JoinGroupApplicationNotification getGroupRequest", err, "dbReq", dbReq)
return
}
var group *sdkws.GroupInfo var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID) group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil { if err != nil {
@ -387,7 +421,13 @@ func (g *NotificationSender) JoinGroupApplicationNotification(ctx context.Contex
return return
} }
userIDs = append(userIDs, req.InviterUserID, mcontext.GetOpUserID(ctx)) userIDs = append(userIDs, req.InviterUserID, mcontext.GetOpUserID(ctx))
tips := &sdkws.JoinGroupApplicationTips{Group: group, Applicant: user, ReqMsg: req.ReqMessage} tips := &sdkws.JoinGroupApplicationTips{
Group: group,
Applicant: user,
ReqMsg: req.ReqMessage,
Uuid: g.uuid(),
Request: request,
}
for _, userID := range datautil.Distinct(userIDs) { for _, userID := range datautil.Distinct(userIDs) {
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.JoinGroupApplicationNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.JoinGroupApplicationNotification, tips)
} }
@ -417,6 +457,11 @@ func (g *NotificationSender) GroupApplicationAcceptedNotification(ctx context.Co
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
} }
}() }()
request, err := g.getGroupRequest(ctx, req.GroupID, req.FromUserID)
if err != nil {
log.ZError(ctx, "GroupApplicationAcceptedNotification getGroupRequest", err, "req", req)
return
}
var group *sdkws.GroupInfo var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID) group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil { if err != nil {
@ -432,8 +477,14 @@ func (g *NotificationSender) GroupApplicationAcceptedNotification(ctx context.Co
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil { if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
return return
} }
tips := &sdkws.GroupApplicationAcceptedTips{
Group: group,
OpUser: opUser,
HandleMsg: req.HandledMsg,
Uuid: g.uuid(),
Request: request,
}
for _, userID := range append(userIDs, req.FromUserID) { for _, userID := range append(userIDs, req.FromUserID) {
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
if userID == req.FromUserID { if userID == req.FromUserID {
tips.ReceiverAs = applicantReceiver tips.ReceiverAs = applicantReceiver
} else { } else {
@ -450,6 +501,11 @@ func (g *NotificationSender) GroupApplicationRejectedNotification(ctx context.Co
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
} }
}() }()
request, err := g.getGroupRequest(ctx, req.GroupID, req.FromUserID)
if err != nil {
log.ZError(ctx, "GroupApplicationAcceptedNotification getGroupRequest", err, "req", req)
return
}
var group *sdkws.GroupInfo var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID) group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil { if err != nil {
@ -465,8 +521,14 @@ func (g *NotificationSender) GroupApplicationRejectedNotification(ctx context.Co
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil { if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
return return
} }
tips := &sdkws.GroupApplicationRejectedTips{
Group: group,
OpUser: opUser,
HandleMsg: req.HandledMsg,
Uuid: g.uuid(),
Request: request,
}
for _, userID := range append(userIDs, req.FromUserID) { for _, userID := range append(userIDs, req.FromUserID) {
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
if userID == req.FromUserID { if userID == req.FromUserID {
tips.ReceiverAs = applicantReceiver tips.ReceiverAs = applicantReceiver
} else { } else {

View File

@ -16,13 +16,10 @@ package msg
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -89,19 +86,20 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf
}) })
} }
func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { // Move to msgtransfer
if msg.MsgData.ContentType == constant.Typing { // func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
return // if msg.MsgData.ContentType == constant.Typing {
} // return
if !filterAfterMsg(msg, after) { // }
return // if !filterAfterMsg(msg, after) {
} // return
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ // }
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), // cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
RecvID: msg.MsgData.RecvID, // CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
} // RecvID: msg.MsgData.RecvID,
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) // }
} // m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
// }
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
@ -123,20 +121,21 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi
}) })
} }
func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { // Move to msgtransfer
if msg.MsgData.ContentType == constant.Typing { // func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
return // if msg.MsgData.ContentType == constant.Typing {
} // return
if !filterAfterMsg(msg, after) { // }
return // if !filterAfterMsg(msg, after) {
} // return
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ // }
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), // cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
GroupID: msg.MsgData.GroupID, // CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
} // GroupID: msg.MsgData.GroupID,
// }
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) // m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
} // }
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
@ -205,14 +204,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
} }
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { // func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
keyMsgData := apistruct.KeyMsgData{ // keyMsgData := apistruct.KeyMsgData{
SendID: msg.SendID, // SendID: msg.SendID,
RecvID: msg.RecvID, // RecvID: msg.RecvID,
GroupID: msg.GroupID, // GroupID: msg.GroupID,
} // }
return map[string]string{ // return map[string]string{
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), // webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
} // }
} // }

View File

@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
go m.setConversationAtInfo(ctx, req.MsgData) go m.setConversationAtInfo(ctx, req.MsgData)
} }
m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
prommetrics.GroupChatMsgProcessSuccessCounter.Inc() prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
resp = &pbmsg.SendMsgResp{} resp = &pbmsg.SendMsgResp{}
resp.SendTime = req.MsgData.SendTime resp.SendTime = req.MsgData.SendTime
@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
prommetrics.SingleChatMsgProcessFailedCounter.Inc() prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err return nil, err
} }
m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
prommetrics.SingleChatMsgProcessSuccessCounter.Inc() prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
return &pbmsg.SendMsgResp{ return &pbmsg.SendMsgResp{
ServerMsgID: req.MsgData.ServerMsgID, ServerMsgID: req.MsgData.ServerMsgID,
@ -201,3 +203,25 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
}, nil }, nil
} }
} }
func (m *msgServer) SendSimpleMsg(ctx context.Context, req *pbmsg.SendSimpleMsgReq) (*pbmsg.SendSimpleMsgResp, error) {
if req.MsgData == nil {
return nil, errs.ErrArgs.WrapMsg("msg data is nil")
}
sender, err := m.UserLocalCache.GetUserInfo(ctx, req.MsgData.SendID)
if err != nil {
return nil, err
}
req.MsgData.SenderFaceURL = sender.FaceURL
req.MsgData.SenderNickname = sender.Nickname
resp, err := m.SendMsg(ctx, &pbmsg.SendMsgReq{MsgData: req.MsgData})
if err != nil {
return nil, err
}
return &pbmsg.SendSimpleMsgResp{
ServerMsgID: resp.ServerMsgID,
ClientMsgID: resp.ClientMsgID,
SendTime: resp.SendTime,
Modify: resp.Modify,
}, nil
}

View File

@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
builder := mqbuild.NewBuilder(&config.KafkaConfig) builder := mqbuild.NewBuilder(&config.KafkaConfig)
redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic)
if err != nil { if err != nil {

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/notification/common_user"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/mq/memamq"
@ -65,7 +66,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx) mgocli, err := dbb.Mongo(ctx)
if err != nil { if err != nil {
@ -100,23 +101,24 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
return err return err
} }
userClient := rpcli.NewUserClient(userConn) userClient := rpcli.NewUserClient(userConn)
database := controller.NewFriendDatabase(
friendMongoDB,
friendRequestMongoDB,
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB),
mgocli.GetTx(),
)
// Initialize notification sender // Initialize notification sender
notificationSender := NewFriendNotificationSender( notificationSender := NewFriendNotificationSender(
&config.NotificationConfig, &config.NotificationConfig,
rpcli.NewMsgClient(msgConn), rpcli.NewMsgClient(msgConn),
WithRpcFunc(userClient.GetUsersInfo), WithRpcFunc(userClient.GetUsersInfo),
WithFriendDB(database),
) )
localcache.InitLocalCache(&config.LocalCacheConfig) localcache.InitLocalCache(&config.LocalCacheConfig)
// Register Friend server with refactored MongoDB and Redis integrations // Register Friend server with refactored MongoDB and Redis integrations
relation.RegisterFriendServer(server, &friendServer{ relation.RegisterFriendServer(server, &friendServer{
db: controller.NewFriendDatabase( db: database,
friendMongoDB,
friendRequestMongoDB,
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB),
mgocli.GetTx(),
),
blackDatabase: controller.NewBlackDatabase( blackDatabase: controller.NewBlackDatabase(
blackMongoDB, blackMongoDB,
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB), redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB),
@ -190,7 +192,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
FromUserID: req.OwnerUserID, FromUserID: req.OwnerUserID,
ToUserID: userID, ToUserID: userID,
HandleResult: constant.FriendResponseAgree, HandleResult: constant.FriendResponseAgree,
}) }, false)
} }
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req) s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
@ -219,7 +221,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
return nil, err return nil, err
} }
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req) s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
s.notificationSender.FriendApplicationAgreedNotification(ctx, req) s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true)
return resp, nil return resp, nil
} }
if req.HandleResult == constant.FriendResponseRefuse { if req.HandleResult == constant.FriendResponseRefuse {
@ -328,7 +330,7 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, req *relat
return nil, err return nil, err
} }
resp = &relation.GetDesignatedFriendsApplyResp{} resp = &relation.GetDesignatedFriendsApplyResp{}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap) resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.getCommonUserMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -341,13 +343,16 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel
return nil, err return nil, err
} }
total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, req.Pagination) handleResults := datautil.Slice(req.HandleResults, func(e int32) int {
return int(e)
})
total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, handleResults, req.Pagination)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp = &relation.GetPaginationFriendsApplyToResp{} resp = &relation.GetPaginationFriendsApplyToResp{}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap) resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.getCommonUserMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -358,18 +363,20 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel
} }
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *relation.GetPaginationFriendsApplyFromReq) (resp *relation.GetPaginationFriendsApplyFromResp, err error) { func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *relation.GetPaginationFriendsApplyFromReq) (resp *relation.GetPaginationFriendsApplyFromResp, err error) {
resp = &relation.GetPaginationFriendsApplyFromResp{}
if err := authverify.CheckAccess(ctx, req.UserID); err != nil { if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
total, friendRequests, err := s.db.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination) handleResults := datautil.Slice(req.HandleResults, func(e int32) int {
return int(e)
})
total, friendRequests, err := s.db.PageFriendRequestFromMe(ctx, req.UserID, handleResults, req.Pagination)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap) resp = &relation.GetPaginationFriendsApplyFromResp{}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.getCommonUserMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -544,3 +551,28 @@ func (s *friendServer) UpdateFriends(ctx context.Context, req *relation.UpdateFr
s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs) s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
return resp, nil return resp, nil
} }
func (s *friendServer) GetSelfUnhandledApplyCount(ctx context.Context, req *relation.GetSelfUnhandledApplyCountReq) (*relation.GetSelfUnhandledApplyCountResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
count, err := s.db.GetUnhandledCount(ctx, req.UserID, req.Time)
if err != nil {
return nil, err
}
return &relation.GetSelfUnhandledApplyCountResp{
Count: count,
}, nil
}
func (s *friendServer) getCommonUserMap(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error) {
users, err := s.userClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.SliceToMapAny(users, func(e *sdkws.UserInfo) (string, common_user.CommonUser) {
return e.UserID, e
}), nil
}

View File

@ -19,6 +19,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
@ -52,9 +55,7 @@ func WithFriendDB(db controller.FriendDatabase) friendNotificationSenderOptions
} }
} }
func WithDBFunc( func WithDBFunc(fn func(ctx context.Context, userIDs []string) (users []*relationtb.User, err error)) friendNotificationSenderOptions {
fn func(ctx context.Context, userIDs []string) (users []*relationtb.User, err error),
) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) { return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) { f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) {
users, err := fn(ctx, userIDs) users, err := fn(ctx, userIDs)
@ -70,9 +71,7 @@ func WithDBFunc(
} }
} }
func WithRpcFunc( func WithRpcFunc(fn func(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error)) friendNotificationSenderOptions {
fn func(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error),
) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) { return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) { f := func(ctx context.Context, userIDs []string) (result []common_user.CommonUser, err error) {
users, err := fn(ctx, userIDs) users, err := fn(ctx, userIDs)
@ -100,10 +99,7 @@ func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.Msg
return f return f
} }
func (f *FriendNotificationSender) getUsersInfoMap( func (f *FriendNotificationSender) getUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) {
ctx context.Context,
userIDs []string,
) (map[string]*sdkws.UserInfo, error) {
users, err := f.getUsersInfo(ctx, userIDs) users, err := f.getUsersInfo(ctx, userIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -116,10 +112,7 @@ func (f *FriendNotificationSender) getUsersInfoMap(
} }
//nolint:unused //nolint:unused
func (f *FriendNotificationSender) getFromToUserNickname( func (f *FriendNotificationSender) getFromToUserNickname(ctx context.Context, fromUserID, toUserID string) (string, string, error) {
ctx context.Context,
fromUserID, toUserID string,
) (string, string, error) {
users, err := f.getUsersInfoMap(ctx, []string{fromUserID, toUserID}) users, err := f.getUsersInfoMap(ctx, []string{fromUserID, toUserID})
if err != nil { if err != nil {
return "", "", nil return "", "", nil
@ -132,60 +125,113 @@ func (f *FriendNotificationSender) UserInfoUpdatedNotification(ctx context.Conte
f.Notification(ctx, mcontext.GetOpUserID(ctx), changedUserID, constant.UserInfoUpdatedNotification, &tips) f.Notification(ctx, mcontext.GetOpUserID(ctx), changedUserID, constant.UserInfoUpdatedNotification, &tips)
} }
func (f *FriendNotificationSender) getCommonUserMap(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error) {
users, err := f.getUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.SliceToMap(users, func(e common_user.CommonUser) string {
return e.GetUserID()
}), nil
}
func (f *FriendNotificationSender) getFriendRequests(ctx context.Context, fromUserID, toUserID string) (*sdkws.FriendRequest, error) {
if f.db == nil {
return nil, errs.ErrInternalServer.WithDetail("db is nil")
}
friendRequests, err := f.db.FindBothFriendRequests(ctx, fromUserID, toUserID)
if err != nil {
return nil, err
}
requests, err := convert.FriendRequestDB2Pb(ctx, friendRequests, f.getCommonUserMap)
if err != nil {
return nil, err
}
for _, request := range requests {
if request.FromUserID == fromUserID && request.ToUserID == toUserID {
return request, nil
}
}
return nil, errs.ErrRecordNotFound.WrapMsg("friend request not found", "fromUserID", fromUserID, "toUserID", toUserID)
}
func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.Context, req *relation.ApplyToAddFriendReq) { func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.Context, req *relation.ApplyToAddFriendReq) {
tips := sdkws.FriendApplicationTips{FromToUserID: &sdkws.FromToUserID{ request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
if err != nil {
log.ZError(ctx, "FriendApplicationAddNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
return
}
tips := sdkws.FriendApplicationTips{
FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID, FromUserID: req.FromUserID,
ToUserID: req.ToUserID, ToUserID: req.ToUserID,
}} },
Request: request,
}
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips) f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
} }
func (f *FriendNotificationSender) FriendApplicationAgreedNotification( func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) {
ctx context.Context, var (
req *relation.RespondFriendApplyReq, request *sdkws.FriendRequest
) { err error
tips := sdkws.FriendApplicationApprovedTips{FromToUserID: &sdkws.FromToUserID{ )
if checkReq {
request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
if err != nil {
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
return
}
}
tips := sdkws.FriendApplicationApprovedTips{
FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID, FromUserID: req.FromUserID,
ToUserID: req.ToUserID, ToUserID: req.ToUserID,
}, HandleMsg: req.HandleMsg} },
HandleMsg: req.HandleMsg,
Request: request,
}
f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips) f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips)
} }
func (f *FriendNotificationSender) FriendApplicationRefusedNotification( func (f *FriendNotificationSender) FriendApplicationRefusedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) {
ctx context.Context, request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
req *relation.RespondFriendApplyReq, if err != nil {
) { log.ZError(ctx, "FriendApplicationRefusedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
tips := sdkws.FriendApplicationApprovedTips{FromToUserID: &sdkws.FromToUserID{ return
}
tips := sdkws.FriendApplicationRejectedTips{
FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID, FromUserID: req.FromUserID,
ToUserID: req.ToUserID, ToUserID: req.ToUserID,
}, HandleMsg: req.HandleMsg} },
HandleMsg: req.HandleMsg,
Request: request,
}
f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationRejectedNotification, &tips) f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationRejectedNotification, &tips)
} }
func (f *FriendNotificationSender) FriendAddedNotification( //func (f *FriendNotificationSender) FriendAddedNotification(ctx context.Context, operationID, opUserID, fromUserID, toUserID string) error {
ctx context.Context, // tips := sdkws.FriendAddedTips{Friend: &sdkws.FriendInfo{}, OpUser: &sdkws.PublicUserInfo{}}
operationID, opUserID, fromUserID, toUserID string, // user, err := f.getUsersInfo(ctx, []string{opUserID})
) error { // if err != nil {
tips := sdkws.FriendAddedTips{Friend: &sdkws.FriendInfo{}, OpUser: &sdkws.PublicUserInfo{}} // return err
user, err := f.getUsersInfo(ctx, []string{opUserID}) // }
if err != nil { // tips.OpUser.UserID = user[0].GetUserID()
return err // tips.OpUser.Ex = user[0].GetEx()
} // tips.OpUser.Nickname = user[0].GetNickname()
tips.OpUser.UserID = user[0].GetUserID() // tips.OpUser.FaceURL = user[0].GetFaceURL()
tips.OpUser.Ex = user[0].GetEx() // friends, err := f.db.FindFriendsWithError(ctx, fromUserID, []string{toUserID})
tips.OpUser.Nickname = user[0].GetNickname() // if err != nil {
tips.OpUser.FaceURL = user[0].GetFaceURL() // return err
friends, err := f.db.FindFriendsWithError(ctx, fromUserID, []string{toUserID}) // }
if err != nil { // tips.Friend, err = convert.FriendDB2Pb(ctx, friends[0], f.getUsersInfoMap)
return err // if err != nil {
} // return err
tips.Friend, err = convert.FriendDB2Pb(ctx, friends[0], f.getUsersInfoMap) // }
if err != nil { // f.Notification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &tips)
return err // return nil
} //}
f.Notification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &tips)
return nil
}
func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context, req *relation.DeleteFriendReq) { func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context, req *relation.DeleteFriendReq) {
tips := sdkws.FriendDeletedTips{FromToUserID: &sdkws.FromToUserID{ tips := sdkws.FriendDeletedTips{FromToUserID: &sdkws.FromToUserID{

View File

@ -64,7 +64,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx) mgocli, err := dbb.Mongo(ctx)
if err != nil { if err != nil {

View File

@ -79,7 +79,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx) mgocli, err := dbb.Mongo(ctx)
if err != nil { if err != nil {
@ -197,6 +197,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
} }
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
//friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) //friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
//if err != nil { //if err != nil {
// return nil, err // return nil, err
@ -209,6 +210,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
//for _, friendID := range friends { //for _, friendID := range friends {
// s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) // s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
//} //}
s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req) s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req)
if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil { if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil {
return nil, err return nil, err

View File

@ -25,7 +25,7 @@ type Config struct {
Discovery config.Discovery Discovery config.Discovery
} }
func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
if conf.CronTask.RetainChatRecords < 1 { if conf.CronTask.RetainChatRecords < 1 {
log.ZInfo(ctx, "disable cron") log.ZInfo(ctx, "disable cron")
@ -49,6 +49,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
return err return err
} }
var locker Locker
if conf.Discovery.Enable == config.ETCD { if conf.Discovery.Enable == config.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
conf.CronTask.GetConfigFileName(), conf.CronTask.GetConfigFileName(),
@ -56,6 +57,14 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
conf.Discovery.GetConfigFileName(), conf.Discovery.GetConfigFileName(),
}) })
cm.Watch(ctx) cm.Watch(ctx)
locker, err = NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient())
if err != nil {
return err
}
}
if locker == nil {
locker = emptyLocker{}
} }
srv := &cronServer{ srv := &cronServer{
@ -65,6 +74,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
msgClient: msg.NewMsgClient(msgConn), msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn), conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn), thirdClient: third.NewThirdClient(thirdConn),
locker: locker,
} }
if err := srv.registerClearS3(); err != nil { if err := srv.registerClearS3(); err != nil {
@ -81,9 +91,21 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
log.ZDebug(ctx, "cron task server is running") log.ZDebug(ctx, "cron task server is running")
<-ctx.Done() <-ctx.Done()
log.ZDebug(ctx, "cron task server is shutting down") log.ZDebug(ctx, "cron task server is shutting down")
srv.cron.Stop()
return nil return nil
} }
type Locker interface {
ExecuteWithLock(ctx context.Context, taskName string, task func())
}
type emptyLocker struct{}
func (emptyLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) {
task()
}
type cronServer struct { type cronServer struct {
ctx context.Context ctx context.Context
config *Config config *Config
@ -91,6 +113,7 @@ type cronServer struct {
msgClient msg.MsgClient msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient thirdClient third.ThirdClient
locker Locker
} }
func (c *cronServer) registerClearS3() error { func (c *cronServer) registerClearS3() error {
@ -98,7 +121,9 @@ func (c *cronServer) registerClearS3() error {
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType) log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
return nil return nil
} }
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3) _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3)
})
return errs.WrapMsg(err, "failed to register clear s3 cron task") return errs.WrapMsg(err, "failed to register clear s3 cron task")
} }
@ -107,11 +132,15 @@ func (c *cronServer) registerDeleteMsg() error {
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords) log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
return nil return nil
} }
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg) _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg)
})
return errs.WrapMsg(err, "failed to register delete msg cron task") return errs.WrapMsg(err, "failed to register delete msg cron task")
} }
func (c *cronServer) registerClearUserMsg() error { func (c *cronServer) registerClearUserMsg() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg) _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg)
})
return errs.WrapMsg(err, "failed to register clear user msg cron task") return errs.WrapMsg(err, "failed to register clear user msg cron task")
} }

View File

@ -0,0 +1,89 @@
package cron
import (
"context"
"fmt"
"os"
"time"
"github.com/openimsdk/tools/log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
const (
lockLeaseTTL = 300
)
type EtcdLocker struct {
client *clientv3.Client
instanceID string
}
// NewEtcdLocker creates a new etcd distributed lock
func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) {
hostname, _ := os.Hostname()
pid := os.Getpid()
instanceID := fmt.Sprintf("%s-pid-%d-%d", hostname, pid, time.Now().UnixNano())
locker := &EtcdLocker{
client: client,
instanceID: instanceID,
}
return locker, nil
}
func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) {
session, err := concurrency.NewSession(e.client, concurrency.WithTTL(lockLeaseTTL))
if err != nil {
log.ZWarn(ctx, "Failed to create etcd session", err,
"taskName", taskName,
"instanceID", e.instanceID)
return
}
defer session.Close()
lockKey := fmt.Sprintf("openim/crontask/%s", taskName)
mutex := concurrency.NewMutex(session, lockKey)
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
err = mutex.TryLock(ctxWithTimeout)
if err != nil {
if err == context.DeadlineExceeded {
log.ZDebug(ctx, "Task is being executed by another instance, skipping",
"taskName", taskName,
"instanceID", e.instanceID)
} else {
log.ZWarn(ctx, "Failed to acquire task lock", err,
"taskName", taskName,
"instanceID", e.instanceID)
}
return
}
defer func() {
if err := mutex.Unlock(ctx); err != nil {
log.ZWarn(ctx, "Failed to release task lock", err,
"taskName", taskName,
"instanceID", e.instanceID)
} else {
log.ZInfo(ctx, "Successfully released task lock",
"taskName", taskName,
"instanceID", e.instanceID)
}
}()
log.ZInfo(ctx, "Successfully acquired task lock, starting execution",
"taskName", taskName,
"instanceID", e.instanceID,
"sessionID", session.Lease())
task()
log.ZInfo(ctx, "Task execution completed",
"taskName", taskName,
"instanceID", e.instanceID)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
@ -84,7 +85,7 @@ func (a *ApiCmd) runE() error {
a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Api.ListenIP, "",
a.apiConfig.API.Prometheus.AutoSetPorts, a.apiConfig.API.Prometheus.AutoSetPorts,
nil, int(a.apiConfig.Index), nil, int(a.apiConfig.Index),
a.apiConfig.Discovery.RpcService.MessageGateway, prommetrics.APIKeyName,
&a.apiConfig.Notification, &a.apiConfig.Notification,
a.apiConfig, a.apiConfig,
[]string{}, []string{},

View File

@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer" "github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
@ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error {
"", "", "", "",
true, true,
nil, int(m.msgTransferConfig.Index), nil, int(m.msgTransferConfig.Index),
"", prommetrics.MessageTransferKeyName,
nil, nil,
m.msgTransferConfig, m.msgTransferConfig,
[]string{}, []string{},

View File

@ -327,10 +327,18 @@ type Redis struct {
Address []string `yaml:"address"` Address []string `yaml:"address"`
Username string `yaml:"username"` Username string `yaml:"username"`
Password string `yaml:"password"` Password string `yaml:"password"`
ClusterMode bool `yaml:"clusterMode"` RedisMode string `yaml:"redisMode"`
DB int `yaml:"storage"` DB int `yaml:"db"`
MaxRetry int `yaml:"maxRetry"` MaxRetry int `yaml:"maxRetry"`
PoolSize int `yaml:"poolSize"` PoolSize int `yaml:"poolSize"`
SentinelMode Sentinel `yaml:"sentinelMode"`
}
type Sentinel struct {
MasterName string `yaml:"masterName"`
SentinelAddrs []string `yaml:"sentinelsAddrs"`
RouteByLatency bool `yaml:"routeByLatency"`
RouteRandomly bool `yaml:"routeRandomly"`
} }
type BeforeConfig struct { type BeforeConfig struct {
@ -487,13 +495,19 @@ func (m *Mongo) Build() *mongoutil.Config {
func (r *Redis) Build() *redisutil.Config { func (r *Redis) Build() *redisutil.Config {
return &redisutil.Config{ return &redisutil.Config{
ClusterMode: r.ClusterMode, RedisMode: r.RedisMode,
Address: r.Address, Address: r.Address,
Username: r.Username, Username: r.Username,
Password: r.Password, Password: r.Password,
DB: r.DB, DB: r.DB,
MaxRetry: r.MaxRetry, MaxRetry: r.MaxRetry,
PoolSize: r.PoolSize, PoolSize: r.PoolSize,
Sentinel: &redisutil.Sentinel{
MasterName: r.SentinelMode.MasterName,
SentinelAddrs: r.SentinelMode.SentinelAddrs,
RouteByLatency: r.SentinelMode.RouteByLatency,
RouteRandomly: r.SentinelMode.RouteRandomly,
},
} }
} }

View File

@ -17,7 +17,9 @@ package convert
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/notification/common_user"
"github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
@ -98,7 +100,7 @@ func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly {
}) })
} }
func FriendRequestDB2Pb(ctx context.Context, friendRequests []*model.FriendRequest, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) ([]*sdkws.FriendRequest, error) { func FriendRequestDB2Pb(ctx context.Context, friendRequests []*model.FriendRequest, getUsers func(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error)) ([]*sdkws.FriendRequest, error) {
if len(friendRequests) == 0 { if len(friendRequests) == 0 {
return nil, nil return nil, nil
} }
@ -117,11 +119,11 @@ func FriendRequestDB2Pb(ctx context.Context, friendRequests []*model.FriendReque
fromUser := users[friendRequest.FromUserID] fromUser := users[friendRequest.FromUserID]
res = append(res, &sdkws.FriendRequest{ res = append(res, &sdkws.FriendRequest{
FromUserID: friendRequest.FromUserID, FromUserID: friendRequest.FromUserID,
FromNickname: fromUser.Nickname, FromNickname: fromUser.GetNickname(),
FromFaceURL: fromUser.FaceURL, FromFaceURL: fromUser.GetFaceURL(),
ToUserID: friendRequest.ToUserID, ToUserID: friendRequest.ToUserID,
ToNickname: toUser.Nickname, ToNickname: toUser.GetNickname(),
ToFaceURL: toUser.FaceURL, ToFaceURL: toUser.GetFaceURL(),
HandleResult: friendRequest.HandleResult, HandleResult: friendRequest.HandleResult,
ReqMsg: friendRequest.ReqMsg, ReqMsg: friendRequest.ReqMsg,
CreateTime: friendRequest.CreateTime.UnixMilli(), CreateTime: friendRequest.CreateTime.UnixMilli(),

View File

@ -85,6 +85,8 @@ func Start(listener net.Listener) error {
const ( const (
APIKeyName = "api" APIKeyName = "api"
MessageTransferKeyName = "message-transfer" MessageTransferKeyName = "message-transfer"
TTL = 300
) )
type Target struct { type Target struct {
@ -97,10 +99,14 @@ type RespTarget struct {
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
} }
func BuildDiscoveryKey(name string) string { func BuildDiscoveryKeyPrefix(name string) string {
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
} }
func BuildDiscoveryKey(name string, index int) string {
return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index)
}
func BuildDefaultTarget(host string, ip int) Target { func BuildDefaultTarget(host string, ip int) Target {
return Target{ return Target{
Target: fmt.Sprintf("%s:%d", host, ip), Target: fmt.Sprintf("%s:%d", host, ip),

View File

@ -50,7 +50,7 @@ func init() {
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string, watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
options ...grpc.ServerOption) error { options ...grpc.ServerOption) error {
if notification != nil { if notification != nil {
@ -148,11 +148,13 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
if err != nil { if err != nil {
return err return err
} }
if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { if autoSetPorts {
if !errors.Is(err, discovery.ErrNotSupportedKeyValue) { if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil {
if !errors.Is(err, discovery.ErrNotSupported) {
return err return err
} }
} }
}
go func() { go func() {
err := prommetrics.Start(prometheusListener) err := prommetrics.Start(prometheusListener)
if err == nil { if err == nil {

View File

@ -26,7 +26,7 @@ func TestName111111(t *testing.T) {
"172.16.8.124:7005", "172.16.8.124:7005",
"172.16.8.124:7006", "172.16.8.124:7006",
}, },
ClusterMode: true, RedisMode: "cluster",
Password: "passwd123", Password: "passwd123",
//Address: []string{"localhost:16379"}, //Address: []string{"localhost:16379"},
//Password: "openIM123", //Password: "openIM123",

View File

@ -17,10 +17,11 @@ package controller
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
@ -61,10 +62,10 @@ type FriendDatabase interface {
PageInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*model.Friend, err error) PageInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*model.Friend, err error)
// PageFriendRequestFromMe retrieves the friend requests sent by the user with pagination // PageFriendRequestFromMe retrieves the friend requests sent by the user with pagination
PageFriendRequestFromMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error) PageFriendRequestFromMe(ctx context.Context, userID string, handleResults []int, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error)
// PageFriendRequestToMe retrieves the friend requests received by the user with pagination // PageFriendRequestToMe retrieves the friend requests received by the user with pagination
PageFriendRequestToMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error) PageFriendRequestToMe(ctx context.Context, userID string, handleResults []int, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error)
// FindFriendsWithError fetches specified friends of a user and returns an error if any do not exist // FindFriendsWithError fetches specified friends of a user and returns an error if any do not exist
FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*model.Friend, err error) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*model.Friend, err error)
@ -87,6 +88,8 @@ type FriendDatabase interface {
FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error)
OwnerIncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error OwnerIncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error
GetUnhandledCount(ctx context.Context, userID string, ts int64) (int64, error)
} }
type friendDatabase struct { type friendDatabase struct {
@ -334,13 +337,13 @@ func (f *friendDatabase) PageInWhoseFriends(ctx context.Context, friendUserID st
} }
// PageFriendRequestFromMe retrieves friend requests sent by me. It does not return an error if the result is empty. // PageFriendRequestFromMe retrieves friend requests sent by me. It does not return an error if the result is empty.
func (f *friendDatabase) PageFriendRequestFromMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error) { func (f *friendDatabase) PageFriendRequestFromMe(ctx context.Context, userID string, handleResults []int, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error) {
return f.friendRequest.FindFromUserID(ctx, userID, pagination) return f.friendRequest.FindFromUserID(ctx, userID, handleResults, pagination)
} }
// PageFriendRequestToMe retrieves friend requests received by me. It does not return an error if the result is empty. // PageFriendRequestToMe retrieves friend requests received by me. It does not return an error if the result is empty.
func (f *friendDatabase) PageFriendRequestToMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error) { func (f *friendDatabase) PageFriendRequestToMe(ctx context.Context, userID string, handleResults []int, pagination pagination.Pagination) (total int64, friends []*model.FriendRequest, err error) {
return f.friendRequest.FindToUserID(ctx, userID, pagination) return f.friendRequest.FindToUserID(ctx, userID, handleResults, pagination)
} }
// FindFriendsWithError retrieves specified friends' information for ownerUserID. Returns an error if any friend does not exist. // FindFriendsWithError retrieves specified friends' information for ownerUserID. Returns an error if any friend does not exist.
@ -397,3 +400,7 @@ func (f *friendDatabase) OwnerIncrVersion(ctx context.Context, ownerUserID strin
} }
return f.cache.DelMaxFriendVersion(ownerUserID).ChainExecDel(ctx) return f.cache.DelMaxFriendVersion(ownerUserID).ChainExecDel(ctx)
} }
func (f *friendDatabase) GetUnhandledCount(ctx context.Context, userID string, ts int64) (int64, error) {
return f.friendRequest.GetUnhandledCount(ctx, userID, ts)
}

View File

@ -68,7 +68,7 @@ type GroupDatabase interface {
// FindUserManagedGroupID retrieves group IDs managed by a user. // FindUserManagedGroupID retrieves group IDs managed by a user.
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
// PageGroupRequest paginates through group requests for specified groups. // PageGroupRequest paginates through group requests for specified groups.
PageGroupRequest(ctx context.Context, groupIDs []string, pagination pagination.Pagination) (int64, []*model.GroupRequest, error) PageGroupRequest(ctx context.Context, groupIDs []string, handleResults []int, pagination pagination.Pagination) (int64, []*model.GroupRequest, error)
// GetGroupRoleLevelMemberIDs retrieves user IDs of group members with a specific role level. // GetGroupRoleLevelMemberIDs retrieves user IDs of group members with a specific role level.
GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error) GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error)
@ -100,7 +100,7 @@ type GroupDatabase interface {
// FindGroupRequests retrieves multiple group join requests. // FindGroupRequests retrieves multiple group join requests.
FindGroupRequests(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupRequest, error) FindGroupRequests(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupRequest, error)
// PageGroupRequestUser paginates through group join requests made by a user. // PageGroupRequestUser paginates through group join requests made by a user.
PageGroupRequestUser(ctx context.Context, userID string, pagination pagination.Pagination) (int64, []*model.GroupRequest, error) PageGroupRequestUser(ctx context.Context, userID string, groupIDs []string, handleResults []int, pagination pagination.Pagination) (int64, []*model.GroupRequest, error)
// CountTotal counts the total number of groups as of a certain date. // CountTotal counts the total number of groups as of a certain date.
CountTotal(ctx context.Context, before *time.Time) (count int64, err error) CountTotal(ctx context.Context, before *time.Time) (count int64, err error)
@ -124,6 +124,8 @@ type GroupDatabase interface {
SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error)
FindJoinGroupID(ctx context.Context, userID string) ([]string, error) FindJoinGroupID(ctx context.Context, userID string) ([]string, error)
GetGroupApplicationUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error)
} }
func NewGroupDatabase( func NewGroupDatabase(
@ -304,8 +306,8 @@ func (g *groupDatabase) FindUserManagedGroupID(ctx context.Context, userID strin
return g.groupMemberDB.FindUserManagedGroupID(ctx, userID) return g.groupMemberDB.FindUserManagedGroupID(ctx, userID)
} }
func (g *groupDatabase) PageGroupRequest(ctx context.Context, groupIDs []string, pagination pagination.Pagination) (int64, []*model.GroupRequest, error) { func (g *groupDatabase) PageGroupRequest(ctx context.Context, groupIDs []string, handleResults []int, pagination pagination.Pagination) (int64, []*model.GroupRequest, error) {
return g.groupRequestDB.PageGroup(ctx, groupIDs, pagination) return g.groupRequestDB.PageGroup(ctx, groupIDs, handleResults, pagination)
} }
func (g *groupDatabase) PageGetJoinGroup(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, totalGroupMembers []*model.GroupMember, err error) { func (g *groupDatabase) PageGetJoinGroup(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, totalGroupMembers []*model.GroupMember, err error) {
@ -463,16 +465,12 @@ func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*mode
}) })
} }
func (g *groupDatabase) TakeGroupRequest( func (g *groupDatabase) TakeGroupRequest(ctx context.Context, groupID string, userID string) (*model.GroupRequest, error) {
ctx context.Context,
groupID string,
userID string,
) (*model.GroupRequest, error) {
return g.groupRequestDB.Take(ctx, groupID, userID) return g.groupRequestDB.Take(ctx, groupID, userID)
} }
func (g *groupDatabase) PageGroupRequestUser(ctx context.Context, userID string, pagination pagination.Pagination) (int64, []*model.GroupRequest, error) { func (g *groupDatabase) PageGroupRequestUser(ctx context.Context, userID string, groupIDs []string, handleResults []int, pagination pagination.Pagination) (int64, []*model.GroupRequest, error) {
return g.groupRequestDB.Page(ctx, userID, pagination) return g.groupRequestDB.Page(ctx, userID, groupIDs, handleResults, pagination)
} }
func (g *groupDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) { func (g *groupDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
@ -565,3 +563,7 @@ func (g *groupDatabase) MemberGroupIncrVersion(ctx context.Context, groupID stri
} }
return g.cache.DelMaxGroupMemberVersion(groupID).ChainExecDel(ctx) return g.cache.DelMaxGroupMemberVersion(groupID).ChainExecDel(ctx)
} }
func (g *groupDatabase) GetGroupApplicationUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error) {
return g.groupRequestDB.GetUnhandledCount(ctx, groupIDs, ts)
}

View File

@ -16,6 +16,7 @@ package database
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
) )
@ -33,8 +34,9 @@ type FriendRequest interface {
Find(ctx context.Context, fromUserID, toUserID string) (friendRequest *model.FriendRequest, err error) Find(ctx context.Context, fromUserID, toUserID string) (friendRequest *model.FriendRequest, err error)
Take(ctx context.Context, fromUserID, toUserID string) (friendRequest *model.FriendRequest, err error) Take(ctx context.Context, fromUserID, toUserID string) (friendRequest *model.FriendRequest, err error)
// Get list of friend requests received by toUserID // Get list of friend requests received by toUserID
FindToUserID(ctx context.Context, toUserID string, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error) FindToUserID(ctx context.Context, toUserID string, handleResults []int, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error)
// Get list of friend requests sent by fromUserID // Get list of friend requests sent by fromUserID
FindFromUserID(ctx context.Context, fromUserID string, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error) FindFromUserID(ctx context.Context, fromUserID string, handleResults []int, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error)
FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*model.FriendRequest, err error) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*model.FriendRequest, err error)
GetUnhandledCount(ctx context.Context, userID string, ts int64) (int64, error)
} }

View File

@ -16,6 +16,7 @@ package database
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
) )
@ -26,6 +27,7 @@ type GroupRequest interface {
UpdateHandler(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32) (err error) UpdateHandler(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32) (err error)
Take(ctx context.Context, groupID string, userID string) (groupRequest *model.GroupRequest, err error) Take(ctx context.Context, groupID string, userID string) (groupRequest *model.GroupRequest, err error)
FindGroupRequests(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupRequest, error) FindGroupRequests(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupRequest, error)
Page(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error) Page(ctx context.Context, userID string, groupIDs []string, handleResults []int, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error)
PageGroup(ctx context.Context, groupIDs []string, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error) PageGroup(ctx context.Context, groupIDs []string, handleResults []int, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error)
GetUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error)
} }

View File

@ -16,24 +16,35 @@ package mgo
import ( import (
"context" "context"
"time"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
) )
func NewFriendRequestMongo(db *mongo.Database) (database.FriendRequest, error) { func NewFriendRequestMongo(db *mongo.Database) (database.FriendRequest, error) {
coll := db.Collection(database.FriendRequestName) coll := db.Collection(database.FriendRequestName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{ Keys: bson.D{
{Key: "from_user_id", Value: 1}, {Key: "from_user_id", Value: 1},
{Key: "to_user_id", Value: 1}, {Key: "to_user_id", Value: 1},
}, },
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "create_time", Value: -1},
},
},
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -45,12 +56,24 @@ type FriendRequestMgo struct {
coll *mongo.Collection coll *mongo.Collection
} }
func (f *FriendRequestMgo) FindToUserID(ctx context.Context, toUserID string, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error) { func (f *FriendRequestMgo) sort() any {
return mongoutil.FindPage[*model.FriendRequest](ctx, f.coll, bson.M{"to_user_id": toUserID}, pagination) return bson.D{{Key: "create_time", Value: -1}}
} }
func (f *FriendRequestMgo) FindFromUserID(ctx context.Context, fromUserID string, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error) { func (f *FriendRequestMgo) FindToUserID(ctx context.Context, toUserID string, handleResults []int, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error) {
return mongoutil.FindPage[*model.FriendRequest](ctx, f.coll, bson.M{"from_user_id": fromUserID}, pagination) filter := bson.M{"to_user_id": toUserID}
if len(handleResults) > 0 {
filter["handle_result"] = bson.M{"$in": handleResults}
}
return mongoutil.FindPage[*model.FriendRequest](ctx, f.coll, filter, pagination, options.Find().SetSort(f.sort()))
}
func (f *FriendRequestMgo) FindFromUserID(ctx context.Context, fromUserID string, handleResults []int, pagination pagination.Pagination) (total int64, friendRequests []*model.FriendRequest, err error) {
filter := bson.M{"from_user_id": fromUserID}
if len(handleResults) > 0 {
filter["handle_result"] = bson.M{"$in": handleResults}
}
return mongoutil.FindPage[*model.FriendRequest](ctx, f.coll, filter, pagination, options.Find().SetSort(f.sort()))
} }
func (f *FriendRequestMgo) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*model.FriendRequest, err error) { func (f *FriendRequestMgo) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*model.FriendRequest, err error) {
@ -110,3 +133,11 @@ func (f *FriendRequestMgo) Find(ctx context.Context, fromUserID, toUserID string
func (f *FriendRequestMgo) Take(ctx context.Context, fromUserID, toUserID string) (friendRequest *model.FriendRequest, err error) { func (f *FriendRequestMgo) Take(ctx context.Context, fromUserID, toUserID string) (friendRequest *model.FriendRequest, err error) {
return f.Find(ctx, fromUserID, toUserID) return f.Find(ctx, fromUserID, toUserID)
} }
func (f *FriendRequestMgo) GetUnhandledCount(ctx context.Context, userID string, ts int64) (int64, error) {
filter := bson.M{"to_user_id": userID, "handle_result": 0}
if ts != 0 {
filter["create_time"] = bson.M{"$gt": time.Unix(ts, 0)}
}
return mongoutil.Count(ctx, f.coll, filter)
}

View File

@ -16,25 +16,36 @@ package mgo
import ( import (
"context" "context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/utils/datautil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
func NewGroupRequestMgo(db *mongo.Database) (database.GroupRequest, error) { func NewGroupRequestMgo(db *mongo.Database) (database.GroupRequest, error) {
coll := db.Collection(database.GroupRequestName) coll := db.Collection(database.GroupRequestName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{ Keys: bson.D{
{Key: "group_id", Value: 1}, {Key: "group_id", Value: 1},
{Key: "user_id", Value: 1}, {Key: "user_id", Value: 1},
}, },
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "req_time", Value: -1},
},
},
}) })
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -66,10 +77,39 @@ func (g *GroupRequestMgo) FindGroupRequests(ctx context.Context, groupID string,
return mongoutil.Find[*model.GroupRequest](ctx, g.coll, bson.M{"group_id": groupID, "user_id": bson.M{"$in": userIDs}}) return mongoutil.Find[*model.GroupRequest](ctx, g.coll, bson.M{"group_id": groupID, "user_id": bson.M{"$in": userIDs}})
} }
func (g *GroupRequestMgo) Page(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error) { func (g *GroupRequestMgo) sort() any {
return mongoutil.FindPage[*model.GroupRequest](ctx, g.coll, bson.M{"user_id": userID}, pagination) return bson.D{{Key: "req_time", Value: -1}}
} }
func (g *GroupRequestMgo) PageGroup(ctx context.Context, groupIDs []string, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error) { func (g *GroupRequestMgo) Page(ctx context.Context, userID string, groupIDs []string, handleResults []int, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error) {
return mongoutil.FindPage[*model.GroupRequest](ctx, g.coll, bson.M{"group_id": bson.M{"$in": groupIDs}}, pagination) filter := bson.M{"user_id": userID}
if len(groupIDs) > 0 {
filter["group_id"] = bson.M{"$in": datautil.Distinct(groupIDs)}
}
if len(handleResults) > 0 {
filter["handle_result"] = bson.M{"$in": handleResults}
}
return mongoutil.FindPage[*model.GroupRequest](ctx, g.coll, filter, pagination, options.Find().SetSort(g.sort()))
}
func (g *GroupRequestMgo) PageGroup(ctx context.Context, groupIDs []string, handleResults []int, pagination pagination.Pagination) (total int64, groups []*model.GroupRequest, err error) {
if len(groupIDs) == 0 {
return 0, nil, nil
}
filter := bson.M{"group_id": bson.M{"$in": groupIDs}}
if len(handleResults) > 0 {
filter["handle_result"] = bson.M{"$in": handleResults}
}
return mongoutil.FindPage[*model.GroupRequest](ctx, g.coll, filter, pagination, options.Find().SetSort(g.sort()))
}
func (g *GroupRequestMgo) GetUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error) {
if len(groupIDs) == 0 {
return 0, nil
}
filter := bson.M{"group_id": bson.M{"$in": groupIDs}, "handle_result": 0}
if ts != 0 {
filter["req_time"] = bson.M{"$gt": time.Unix(ts, 0)}
}
return mongoutil.Count(ctx, g.coll, filter)
} }

View File

@ -1,6 +1,6 @@
serviceBinaries: serviceBinaries:
openim-api: 1 openim-api: 1
openim-crontask: 1 openim-crontask: 4
openim-rpc-user: 1 openim-rpc-user: 1
openim-msggateway: 1 openim-msggateway: 1
openim-push: 8 openim-push: 8

View File

@ -28,6 +28,8 @@ import (
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
) )
const StructTagName = "yaml"
const ( const (
MaxSeq = "MAX_SEQ:" MaxSeq = "MAX_SEQ:"
MinSeq = "MIN_SEQ:" MinSeq = "MIN_SEQ:"
@ -54,13 +56,14 @@ func readConfig[T any](dir string, name string) (*T, error) {
if err := v.ReadInConfig(); err != nil { if err := v.ReadInConfig(); err != nil {
return nil, err return nil, err
} }
fn := func(config *mapstructure.DecoderConfig) {
config.TagName = "mapstructure"
}
var conf T var conf T
if err := v.Unmarshal(&conf, fn); err != nil { if err := v.Unmarshal(&conf, func(config *mapstructure.DecoderConfig) {
config.TagName = StructTagName
}); err != nil {
return nil, err return nil, err
} }
return &conf, nil return &conf, nil
} }
@ -69,6 +72,7 @@ func Main(conf string, del time.Duration) error {
if err != nil { if err != nil {
return err return err
} }
mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName) mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName)
if err != nil { if err != nil {
return err return err