mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-26 21:22:16 +08:00
Merge branch 'main' of github.com:openimsdk/open-im-server into build/k8s
This commit is contained in:
commit
2d5eb5197b
4
.env
4
.env
@ -1,6 +1,5 @@
|
||||
MONGO_IMAGE=mongo:6.0.2
|
||||
MONGO_IMAGE=mongo:7.0
|
||||
REDIS_IMAGE=redis:7.0.0
|
||||
ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8
|
||||
KAFKA_IMAGE=bitnami/kafka:3.5.1
|
||||
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z
|
||||
ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13
|
||||
@ -16,4 +15,3 @@ OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.8.2
|
||||
#OPENIM_ADMIN_FRONT_IMAGE=registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-admin-front:release-v1.8.2
|
||||
|
||||
DATA_DIR=./
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
# Use Go 1.21 Alpine as the base image for building the application
|
||||
# Use Go 1.22 Alpine as the base image for building the application
|
||||
FROM golang:1.22-alpine AS builder
|
||||
|
||||
# Define the base directory for the application as an environment variable
|
||||
|
||||
@ -8,6 +8,8 @@ database: openim_v3
|
||||
username: openIM
|
||||
# Password for database authentication
|
||||
password: openIM123
|
||||
# Authentication source for database authentication, if use root user, set it to admin
|
||||
authSource: openim_v3
|
||||
# Maximum number of connections in the connection pool
|
||||
maxPoolSize: 100
|
||||
# Maximum number of retry attempts for a failed database connection
|
||||
|
||||
@ -22,5 +22,3 @@ longConnSvr:
|
||||
websocketMaxMsgLen: 4096
|
||||
# WebSocket connection handshake timeout in seconds
|
||||
websocketTimeout: 10
|
||||
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ prometheus:
|
||||
ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ]
|
||||
|
||||
maxConcurrentWorkers: 3
|
||||
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.
|
||||
#Use geTui for offline push notifications, or choose fcm or jpush; corresponding configuration settings must be specified.
|
||||
enable: geTui
|
||||
geTui:
|
||||
pushUrl: https://restapi.getui.com/v2/$appId
|
||||
@ -26,7 +26,7 @@ fcm:
|
||||
# Prioritize using file paths. If the file path is empty, use URL
|
||||
filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath.
|
||||
authURL: # Must start with https or http.
|
||||
jpns:
|
||||
jpush:
|
||||
appKey:
|
||||
masterSecret:
|
||||
pushURL:
|
||||
@ -34,8 +34,8 @@ jpns:
|
||||
|
||||
# iOS system push sound and badge count
|
||||
iosPush:
|
||||
pushSound: xxx
|
||||
badgeCount: true
|
||||
production: false
|
||||
pushSound: xxx
|
||||
badgeCount: true
|
||||
production: false
|
||||
|
||||
fullUserCache: true
|
||||
|
||||
@ -240,11 +240,11 @@ push:
|
||||
channelName: ${GETUI_CHANNEL_NAME}
|
||||
fcm:
|
||||
serviceAccount: "${FCM_SERVICE_ACCOUNT}"
|
||||
jpns:
|
||||
appKey: ${JPNS_APP_KEY}
|
||||
masterSecret: ${JPNS_MASTER_SECRET}
|
||||
pushUrl: ${JPNS_PUSH_URL}
|
||||
pushIntent: ${JPNS_PUSH_INTENT}
|
||||
jpush:
|
||||
appKey: ${JPUSH_APP_KEY}
|
||||
masterSecret: ${JPUSH_MASTER_SECRET}
|
||||
pushUrl: ${JPUSH_PUSH_URL}
|
||||
pushIntent: ${JPUSH_PUSH_INTENT}
|
||||
|
||||
# App manager configuration
|
||||
#
|
||||
|
||||
@ -8,12 +8,35 @@ services:
|
||||
ports:
|
||||
- "37017:27017"
|
||||
container_name: mongo
|
||||
command: ["/bin/bash", "-c", "/docker-entrypoint-initdb.d/mongo-init.sh; docker-entrypoint.sh mongod --wiredTigerCacheSizeGB 1 --auth"]
|
||||
command: >
|
||||
bash -c '
|
||||
docker-entrypoint.sh mongod --wiredTigerCacheSizeGB $$wiredTigerCacheSizeGB --auth &
|
||||
until mongosh -u $$MONGO_INITDB_ROOT_USERNAME -p $$MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin --eval "db.runCommand({ ping: 1 })" &>/dev/null; do
|
||||
echo "Waiting for MongoDB to start..."
|
||||
sleep 1
|
||||
done &&
|
||||
mongosh -u $$MONGO_INITDB_ROOT_USERNAME -p $$MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin --eval "
|
||||
db = db.getSiblingDB(\"$$MONGO_INITDB_DATABASE\");
|
||||
if (!db.getUser(\"$$MONGO_OPENIM_USERNAME\")) {
|
||||
db.createUser({
|
||||
user: \"$$MONGO_OPENIM_USERNAME\",
|
||||
pwd: \"$$MONGO_OPENIM_PASSWORD\",
|
||||
roles: [{role: \"readWrite\", db: \"$$MONGO_INITDB_DATABASE\"}]
|
||||
});
|
||||
print(\"User created successfully: \");
|
||||
print(\"Username: $$MONGO_OPENIM_USERNAME\");
|
||||
print(\"Password: $$MONGO_OPENIM_PASSWORD\");
|
||||
print(\"Database: $$MONGO_INITDB_DATABASE\");
|
||||
} else {
|
||||
print(\"User already exists in database: $$MONGO_INITDB_DATABASE, Username: $$MONGO_OPENIM_USERNAME\");
|
||||
}
|
||||
" &&
|
||||
tail -f /dev/null
|
||||
'
|
||||
volumes:
|
||||
- "${DATA_DIR}/components/mongodb/data/db:/data/db"
|
||||
- "${DATA_DIR}/components/mongodb/data/logs:/data/logs"
|
||||
- "${DATA_DIR}/components/mongodb/data/conf:/etc/mongo"
|
||||
- "./scripts/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro"
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
- wiredTigerCacheSizeGB=1
|
||||
@ -71,10 +94,7 @@ services:
|
||||
ports:
|
||||
- "19094:9094"
|
||||
volumes:
|
||||
- ./scripts/create-topic.sh:/opt/bitnami/kafka/create-topic.sh
|
||||
- "${DATA_DIR}/components/kafka:/bitnami/kafka"
|
||||
command: >
|
||||
bash -c "/opt/bitnami/scripts/kafka/run.sh & /opt/bitnami/kafka/create-topic.sh; wait"
|
||||
environment:
|
||||
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
|
||||
TZ: Asia/Shanghai
|
||||
@ -85,10 +105,11 @@ services:
|
||||
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
|
||||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||
KAFKA_NUM_PARTITIONS: 8
|
||||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
||||
networks:
|
||||
- openim
|
||||
|
||||
|
||||
minio:
|
||||
image: "${MINIO_IMAGE}"
|
||||
ports:
|
||||
@ -171,4 +192,3 @@ services:
|
||||
# - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana
|
||||
# networks:
|
||||
# - openim
|
||||
|
||||
|
||||
@ -474,10 +474,10 @@ This section involves setting up additional configuration variables for Websocke
|
||||
| GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID |
|
||||
| GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name |
|
||||
| FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account |
|
||||
| JPNS_APP_KEY | [User Defined] | JPNS Application Key |
|
||||
| JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret |
|
||||
| JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL |
|
||||
| JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent |
|
||||
| JPUSH_APP_KEY | [User Defined] | JPUSH Application Key |
|
||||
| JPUSH_MASTER_SECRET | [User Defined] | JPUSH Master Secret |
|
||||
| JPUSH_PUSH_URL | [User Defined] | JPUSH Push Notification URL |
|
||||
| JPUSH_PUSH_INTENT | [User Defined] | JPUSH Push Intent |
|
||||
| IM_ADMIN_USERID | "imAdmin" | IM Administrator ID |
|
||||
| IM_ADMIN_NAME | "imAdmin" | IM Administrator Nickname |
|
||||
| MULTILOGIN_POLICY | "1" | Multi-login Policy |
|
||||
|
||||
4
go.mod
4
go.mod
@ -15,7 +15,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.54
|
||||
github.com/openimsdk/tools v0.0.50-alpha.23
|
||||
github.com/openimsdk/tools v0.0.50-alpha.32
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
@ -37,7 +37,7 @@ require (
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
github.com/kelindar/bitmap v1.5.2
|
||||
github.com/likexian/gokit v0.25.13
|
||||
github.com/mo3et/openim-gomake v0.0.18
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5
|
||||
github.com/redis/go-redis/v9 v9.4.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible
|
||||
|
||||
8
go.sum
8
go.sum
@ -325,8 +325,6 @@ github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5
|
||||
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/mo3et/openim-gomake v0.0.18 h1:PlX+Sx6UqIBKhe+BlmU8fHs5BbUKqMl4rKqWGDxGQCA=
|
||||
github.com/mo3et/openim-gomake v0.0.18/go.mod h1:gyCVDWC6L32//+HIf47EeB53S2JYsl3QZDFpC/m6kbE=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
@ -347,10 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
|
||||
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
|
||||
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.54 h1:opato7N4QjjRq/SHD54bDSVBpOEEDp1VLWVk5Os2A9s=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.54/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.23 h1:gGCOVDFC/wrj2ybLrSrykGlcTfB1Z7MwfiWpEnjJKyU=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.23/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.32 h1:JEsUFHFnaYg230TG+Ke3SUnaA2h44t4kABAzEdv5VZw=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.32/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
|
||||
@ -16,6 +16,7 @@ package msggateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
@ -69,6 +70,8 @@ type Client struct {
|
||||
IsCompress bool `json:"isCompress"`
|
||||
UserID string `json:"userID"`
|
||||
IsBackground bool `json:"isBackground"`
|
||||
SDKType string `json:"sdkType"`
|
||||
Encoder Encoder
|
||||
ctx *UserConnContext
|
||||
longConnServer LongConnServer
|
||||
closed atomic.Bool
|
||||
@ -94,11 +97,17 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
|
||||
c.closed.Store(false)
|
||||
c.closedErr = nil
|
||||
c.token = ctx.GetToken()
|
||||
c.SDKType = ctx.GetSDKType()
|
||||
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
|
||||
c.subLock = new(sync.Mutex)
|
||||
if c.subUserIDs != nil {
|
||||
clear(c.subUserIDs)
|
||||
}
|
||||
if c.SDKType == GoSDK {
|
||||
c.Encoder = NewGobEncoder()
|
||||
} else {
|
||||
c.Encoder = NewJsonEncoder()
|
||||
}
|
||||
c.subUserIDs = make(map[string]struct{})
|
||||
}
|
||||
|
||||
@ -159,9 +168,12 @@ func (c *Client) readMessage() {
|
||||
return
|
||||
}
|
||||
case MessageText:
|
||||
c.closedErr = ErrNotSupportMessageProtocol
|
||||
return
|
||||
|
||||
_ = c.conn.SetReadDeadline(pongWait)
|
||||
parseDataErr := c.handlerTextMessage(message)
|
||||
if parseDataErr != nil {
|
||||
c.closedErr = parseDataErr
|
||||
return
|
||||
}
|
||||
case PingMessage:
|
||||
err := c.writePongMsg("")
|
||||
log.ZError(c.ctx, "writePongMsg", err)
|
||||
@ -188,7 +200,7 @@ func (c *Client) handleMessage(message []byte) error {
|
||||
var binaryReq = getReq()
|
||||
defer freeReq(binaryReq)
|
||||
|
||||
err := c.longConnServer.Decode(message, binaryReq)
|
||||
err := c.Encoder.Decode(message, binaryReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -335,7 +347,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
encodedBuf, err := c.longConnServer.Encode(resp)
|
||||
encodedBuf, err := c.Encoder.Encode(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -419,3 +431,26 @@ func (c *Client) writePongMsg(appData string) error {
|
||||
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (c *Client) handlerTextMessage(b []byte) error {
|
||||
var msg TextMessage
|
||||
if err := json.Unmarshal(b, &msg); err != nil {
|
||||
return err
|
||||
}
|
||||
switch msg.Type {
|
||||
case TextPong:
|
||||
return nil
|
||||
case TextPing:
|
||||
msg.Type = TextPong
|
||||
msgData, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.conn.SetWriteDeadline(writeWait); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.conn.WriteMessage(MessageText, msgData)
|
||||
default:
|
||||
return fmt.Errorf("not support message type %s", msg.Type)
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +27,12 @@ const (
|
||||
GzipCompressionProtocol = "gzip"
|
||||
BackgroundStatus = "isBackground"
|
||||
SendResponse = "isMsgResp"
|
||||
SDKType = "sdkType"
|
||||
)
|
||||
|
||||
const (
|
||||
GoSDK = "go"
|
||||
JsSDK = "js"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@ -153,6 +153,14 @@ func (c *UserConnContext) GetCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *UserConnContext) GetSDKType() string {
|
||||
sdkType := c.Req.URL.Query().Get(SDKType)
|
||||
if sdkType == "" {
|
||||
sdkType = GoSDK
|
||||
}
|
||||
return sdkType
|
||||
}
|
||||
|
||||
func (c *UserConnContext) ShouldSendResp() bool {
|
||||
errResp, exists := c.Query(SendResponse)
|
||||
if exists {
|
||||
@ -193,7 +201,11 @@ func (c *UserConnContext) ParseEssentialArgs() error {
|
||||
_, err := strconv.Atoi(platformIDStr)
|
||||
if err != nil {
|
||||
return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")
|
||||
|
||||
}
|
||||
switch sdkType, _ := c.Query(SDKType); sdkType {
|
||||
case "", GoSDK, JsSDK:
|
||||
default:
|
||||
return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ package msggateway
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/openimsdk/tools/errs"
|
||||
)
|
||||
@ -28,12 +29,12 @@ type Encoder interface {
|
||||
|
||||
type GobEncoder struct{}
|
||||
|
||||
func NewGobEncoder() *GobEncoder {
|
||||
return &GobEncoder{}
|
||||
func NewGobEncoder() Encoder {
|
||||
return GobEncoder{}
|
||||
}
|
||||
|
||||
func (g *GobEncoder) Encode(data any) ([]byte, error) {
|
||||
buff := bytes.Buffer{}
|
||||
func (g GobEncoder) Encode(data any) ([]byte, error) {
|
||||
var buff bytes.Buffer
|
||||
enc := gob.NewEncoder(&buff)
|
||||
if err := enc.Encode(data); err != nil {
|
||||
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
|
||||
@ -41,7 +42,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
|
||||
func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||
func (g GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||
buff := bytes.NewBuffer(encodeData)
|
||||
dec := gob.NewDecoder(buff)
|
||||
if err := dec.Decode(decodeData); err != nil {
|
||||
@ -49,3 +50,25 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type JsonEncoder struct{}
|
||||
|
||||
func NewJsonEncoder() Encoder {
|
||||
return JsonEncoder{}
|
||||
}
|
||||
|
||||
func (g JsonEncoder) Encode(data any) ([]byte, error) {
|
||||
b, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return nil, errs.New("JsonEncoder.Encode failed", "action", "encode")
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||
err := json.Unmarshal(encodeData, decodeData)
|
||||
if err != nil {
|
||||
return errs.New("JsonEncoder.Decode failed", "action", "decode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) OnlinePushMsg(
|
||||
context context.Context,
|
||||
req *msggateway.OnlinePushMsgReq,
|
||||
) (*msggateway.OnlinePushMsgResp, error) {
|
||||
func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Server) GetUsersOnlineStatus(
|
||||
ctx context.Context,
|
||||
req *msggateway.GetUsersOnlineStatusReq,
|
||||
) (*msggateway.GetUsersOnlineStatusResp, error) {
|
||||
func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) {
|
||||
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("only app manager")
|
||||
}
|
||||
@ -155,6 +149,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M
|
||||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
||||
err := client.PushMessage(ctx, msgData)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
|
||||
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
|
||||
} else {
|
||||
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
||||
@ -220,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) KickUserOffline(
|
||||
ctx context.Context,
|
||||
req *msggateway.KickUserOfflineReq,
|
||||
) (*msggateway.KickUserOfflineResp, error) {
|
||||
func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) {
|
||||
for _, v := range req.KickUserIDList {
|
||||
clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
|
||||
if !ok {
|
||||
|
||||
@ -16,6 +16,7 @@ package msggateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
@ -31,6 +32,16 @@ import (
|
||||
"github.com/openimsdk/tools/utils/jsonutil"
|
||||
)
|
||||
|
||||
const (
|
||||
TextPing = "ping"
|
||||
TextPong = "pong"
|
||||
)
|
||||
|
||||
type TextMessage struct {
|
||||
Type string `json:"type"`
|
||||
Body json.RawMessage `json:"body"`
|
||||
}
|
||||
|
||||
type Req struct {
|
||||
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
|
||||
Token string `json:"token"`
|
||||
|
||||
@ -37,7 +37,6 @@ type LongConnServer interface {
|
||||
SetKickHandlerInfo(i *kickHandler)
|
||||
SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)
|
||||
Compressor
|
||||
Encoder
|
||||
MessageHandler
|
||||
}
|
||||
|
||||
@ -61,7 +60,7 @@ type WsServer struct {
|
||||
authClient *rpcclient.Auth
|
||||
disCov discovery.SvcDiscoveryRegistry
|
||||
Compressor
|
||||
Encoder
|
||||
//Encoder
|
||||
MessageHandler
|
||||
webhookClient *webhook.Client
|
||||
}
|
||||
@ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
|
||||
clients: newUserMap(),
|
||||
subscription: newSubscription(),
|
||||
Compressor: NewGzipCompressor(),
|
||||
Encoder: NewGobEncoder(),
|
||||
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
|
||||
}
|
||||
}
|
||||
@ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
log.ZDebug(
|
||||
client.ctx,
|
||||
"user online",
|
||||
"online user Num",
|
||||
ws.onlineUserNum.Load(),
|
||||
"online user conn Num",
|
||||
ws.onlineUserConnNum.Load(),
|
||||
)
|
||||
log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load())
|
||||
}
|
||||
|
||||
func getRemoteAdders(client []*Client) string {
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
package body
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
)
|
||||
|
||||
@ -26,38 +27,44 @@ type Notification struct {
|
||||
|
||||
type Android struct {
|
||||
Alert string `json:"alert,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Intent struct {
|
||||
URL string `json:"url,omitempty"`
|
||||
} `json:"intent,omitempty"`
|
||||
Extras Extras `json:"extras"`
|
||||
Extras map[string]string `json:"extras,omitempty"`
|
||||
}
|
||||
type Ios struct {
|
||||
Alert string `json:"alert,omitempty"`
|
||||
Sound string `json:"sound,omitempty"`
|
||||
Badge string `json:"badge,omitempty"`
|
||||
Extras Extras `json:"extras"`
|
||||
MutableContent bool `json:"mutable-content"`
|
||||
Alert IosAlert `json:"alert,omitempty"`
|
||||
Sound string `json:"sound,omitempty"`
|
||||
Badge string `json:"badge,omitempty"`
|
||||
Extras map[string]string `json:"extras,omitempty"`
|
||||
MutableContent bool `json:"mutable-content"`
|
||||
}
|
||||
|
||||
type Extras struct {
|
||||
ClientMsgID string `json:"clientMsgID"`
|
||||
type IosAlert struct {
|
||||
Title string `json:"title,omitempty"`
|
||||
Body string `json:"body,omitempty"`
|
||||
}
|
||||
|
||||
func (n *Notification) SetAlert(alert string) {
|
||||
func (n *Notification) SetAlert(alert string, title string, opts *options.Opts) {
|
||||
n.Alert = alert
|
||||
n.Android.Alert = alert
|
||||
n.IOS.Alert = alert
|
||||
n.IOS.Sound = "default"
|
||||
n.IOS.Badge = "+1"
|
||||
n.Android.Title = title
|
||||
n.IOS.Alert.Body = alert
|
||||
n.IOS.Alert.Title = title
|
||||
n.IOS.Sound = opts.IOSPushSound
|
||||
if opts.IOSBadgeCount {
|
||||
n.IOS.Badge = "+1"
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notification) SetExtras(extras Extras) {
|
||||
func (n *Notification) SetExtras(extras map[string]string) {
|
||||
n.IOS.Extras = extras
|
||||
n.Android.Extras = extras
|
||||
}
|
||||
|
||||
func (n *Notification) SetAndroidIntent(pushConf *config.Push) {
|
||||
n.Android.Intent.URL = pushConf.JPNS.PushIntent
|
||||
n.Android.Intent.URL = pushConf.JPush.PushIntent
|
||||
}
|
||||
|
||||
func (n *Notification) IOSEnableMutableContent() {
|
||||
|
||||
@ -18,9 +18,9 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush/body"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/tools/utils/httputil"
|
||||
)
|
||||
@ -57,17 +57,23 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
|
||||
var au body.Audience
|
||||
au.SetAlias(userIDs)
|
||||
var no body.Notification
|
||||
var extras body.Extras
|
||||
extras := make(map[string]string)
|
||||
extras["ex"] = opts.Ex
|
||||
if opts.Signal.ClientMsgID != "" {
|
||||
extras.ClientMsgID = opts.Signal.ClientMsgID
|
||||
extras["ClientMsgID"] = opts.Signal.ClientMsgID
|
||||
}
|
||||
no.IOSEnableMutableContent()
|
||||
no.SetExtras(extras)
|
||||
no.SetAlert(title)
|
||||
no.SetAlert(content, title, opts)
|
||||
no.SetAndroidIntent(j.pushConf)
|
||||
|
||||
var msg body.Message
|
||||
msg.SetMsgContent(content)
|
||||
msg.SetTitle(title)
|
||||
if opts.Signal.ClientMsgID != "" {
|
||||
msg.SetExtras("ClientMsgID", opts.Signal.ClientMsgID)
|
||||
}
|
||||
msg.SetExtras("ex", opts.Ex)
|
||||
var opt body.Options
|
||||
opt.SetApnsProduction(j.pushConf.IOSPush.Production)
|
||||
var pushObj body.PushObj
|
||||
@ -76,19 +82,26 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
|
||||
pushObj.SetNotification(&no)
|
||||
pushObj.SetMessage(&msg)
|
||||
pushObj.SetOptions(&opt)
|
||||
var resp any
|
||||
return j.request(ctx, pushObj, resp, 5)
|
||||
var resp map[string]any
|
||||
return j.request(ctx, pushObj, &resp, 5)
|
||||
}
|
||||
|
||||
func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error {
|
||||
return j.httpClient.PostReturn(
|
||||
func (j *JPush) request(ctx context.Context, po body.PushObj, resp *map[string]any, timeout int) error {
|
||||
err := j.httpClient.PostReturn(
|
||||
ctx,
|
||||
j.pushConf.JPNS.PushURL,
|
||||
j.pushConf.JPush.PushURL,
|
||||
map[string]string{
|
||||
"Authorization": j.getAuthorization(j.pushConf.JPNS.AppKey, j.pushConf.JPNS.MasterSecret),
|
||||
"Authorization": j.getAuthorization(j.pushConf.JPush.AppKey, j.pushConf.JPush.MasterSecret),
|
||||
},
|
||||
po,
|
||||
resp,
|
||||
timeout,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (*resp)["sendno"] != "0" {
|
||||
return fmt.Errorf("jpush push failed %v", resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -73,7 +73,7 @@ func (o *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (ti
|
||||
IsAtSelf bool `json:"isAtSelf"`
|
||||
}
|
||||
|
||||
opts = &options.Opts{Signal: &options.Signal{}}
|
||||
opts = &options.Opts{Signal: &options.Signal{ClientMsgID: msg.ClientMsgID}}
|
||||
if msg.OfflinePushInfo != nil {
|
||||
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
|
||||
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
|
||||
|
||||
@ -4,6 +4,10 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
@ -27,9 +31,6 @@ import (
|
||||
"github.com/openimsdk/tools/utils/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ConsumerHandler struct {
|
||||
@ -335,6 +336,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri
|
||||
func (c *ConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
|
||||
title, content, opts, err := c.getOfflinePushInfos(msg)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "getOfflinePushInfos failed", err, "msg", msg)
|
||||
return err
|
||||
}
|
||||
err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
|
||||
@ -364,7 +366,7 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten
|
||||
IsAtSelf bool `json:"isAtSelf"`
|
||||
}
|
||||
|
||||
opts = &options.Opts{Signal: &options.Signal{}}
|
||||
opts = &options.Opts{Signal: &options.Signal{ClientMsgID: msg.ClientMsgID}}
|
||||
if msg.OfflinePushInfo != nil {
|
||||
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
|
||||
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
|
||||
|
||||
@ -69,6 +69,7 @@ type Mongo struct {
|
||||
Database string `mapstructure:"database"`
|
||||
Username string `mapstructure:"username"`
|
||||
Password string `mapstructure:"password"`
|
||||
AuthSource string `mapstructure:"authSource"`
|
||||
MaxPoolSize int `mapstructure:"maxPoolSize"`
|
||||
MaxRetry int `mapstructure:"maxRetry"`
|
||||
}
|
||||
@ -212,12 +213,12 @@ type Push struct {
|
||||
FilePath string `mapstructure:"filePath"`
|
||||
AuthURL string `mapstructure:"authURL"`
|
||||
} `mapstructure:"fcm"`
|
||||
JPNS struct {
|
||||
JPush struct {
|
||||
AppKey string `mapstructure:"appKey"`
|
||||
MasterSecret string `mapstructure:"masterSecret"`
|
||||
PushURL string `mapstructure:"pushURL"`
|
||||
PushIntent string `mapstructure:"pushIntent"`
|
||||
} `mapstructure:"jpns"`
|
||||
} `mapstructure:"jpush"`
|
||||
IOSPush struct {
|
||||
PushSound string `mapstructure:"pushSound"`
|
||||
BadgeCount bool `mapstructure:"badgeCount"`
|
||||
@ -490,6 +491,7 @@ func (m *Mongo) Build() *mongoutil.Config {
|
||||
Database: m.Database,
|
||||
Username: m.Username,
|
||||
Password: m.Password,
|
||||
AuthSource: m.AuthSource,
|
||||
MaxPoolSize: m.MaxPoolSize,
|
||||
MaxRetry: m.MaxRetry,
|
||||
}
|
||||
|
||||
@ -2,15 +2,14 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/tools/log"
|
||||
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/tokenverify"
|
||||
)
|
||||
|
||||
|
||||
@ -1,41 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Wait for Kafka to be ready
|
||||
|
||||
KAFKA_SERVER=kafka-service:9092
|
||||
|
||||
MAX_ATTEMPTS=300
|
||||
attempt_num=1
|
||||
|
||||
echo "Waiting for Kafka to be ready..."
|
||||
|
||||
until /opt/bitnami/kafka/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_SERVER; do
|
||||
echo "Attempt $attempt_num of $MAX_ATTEMPTS: Kafka not ready yet..."
|
||||
if [ $attempt_num -eq $MAX_ATTEMPTS ]; then
|
||||
echo "Kafka not ready after $MAX_ATTEMPTS attempts, exiting"
|
||||
exit 1
|
||||
fi
|
||||
attempt_num=$((attempt_num+1))
|
||||
sleep 1
|
||||
done
|
||||
|
||||
echo "Kafka is ready. Creating topics..."
|
||||
|
||||
|
||||
topics=("toRedis" "toMongo" "toPush" "toOfflinePush")
|
||||
partitions=8
|
||||
replicationFactor=1
|
||||
|
||||
for topic in "${topics[@]}"; do
|
||||
if /opt/bitnami/kafka/bin/kafka-topics.sh --create \
|
||||
--bootstrap-server $KAFKA_SERVER \
|
||||
--replication-factor $replicationFactor \
|
||||
--partitions $partitions \
|
||||
--topic $topic
|
||||
then
|
||||
echo "Topic $topic created."
|
||||
else
|
||||
echo "Failed to create topic $topic."
|
||||
fi
|
||||
done
|
||||
|
||||
echo "All topics created."
|
||||
@ -1,66 +0,0 @@
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
mongosh <<EOF
|
||||
var maxRetries = 300;
|
||||
var connected = false;
|
||||
var rootUsername = '$MONGO_INITDB_ROOT_USERNAME';
|
||||
var rootPassword = '$MONGO_INITDB_ROOT_PASSWORD';
|
||||
var dbName = '$MONGO_INITDB_DATABASE';
|
||||
var openimUsername = '$MONGO_OPENIM_USERNAME';
|
||||
var openimPassword = '$MONGO_OPENIM_PASSWORD';
|
||||
|
||||
while (!connected && maxRetries > 0) {
|
||||
try {
|
||||
db = connect('mongodb://127.0.0.1:27017/admin');
|
||||
var authResult = db.auth(rootUsername, rootPassword);
|
||||
if (authResult) {
|
||||
print('Authentication successful for root user: ' + rootUsername);
|
||||
connected = true;
|
||||
} else {
|
||||
print('Authentication failed for root user: ' + rootUsername + ' with password: ' + rootPassword);
|
||||
quit(1);
|
||||
}
|
||||
} catch (e) {
|
||||
maxRetries--;
|
||||
print('Connection failed, retrying... Remaining attempts: ' + maxRetries);
|
||||
sleep(1000); // Sleep for 1 second
|
||||
}
|
||||
}
|
||||
|
||||
if (connected) {
|
||||
db = db.getSiblingDB(dbName);
|
||||
var createUserResult = db.createUser({
|
||||
user: openimUsername,
|
||||
pwd: openimPassword,
|
||||
roles: [{
|
||||
role: 'readWrite',
|
||||
db: dbName
|
||||
}]
|
||||
});
|
||||
|
||||
if (createUserResult.ok == 1) {
|
||||
print('User creation successful. User: ' + openimUsername + ', Database: ' + dbName);
|
||||
} else {
|
||||
print('User creation failed for user: ' + openimUsername + ' in database: ' + dbName);
|
||||
quit(1);
|
||||
}
|
||||
} else {
|
||||
print('Failed to connect to MongoDB after 300 retries.');
|
||||
quit(1);
|
||||
}
|
||||
EOF
|
||||
|
||||
|
||||
|
||||
|
||||
@ -72,7 +72,7 @@ func CheckMinIO(ctx context.Context, config *config.Minio) error {
|
||||
}
|
||||
|
||||
func CheckKafka(ctx context.Context, conf *config.Kafka) error {
|
||||
return kafka.Check(ctx, conf.Build(), []string{conf.ToMongoTopic, conf.ToRedisTopic, conf.ToPushTopic, conf.ToOfflinePushTopic})
|
||||
return kafka.CheckHealth(ctx, conf.Build())
|
||||
}
|
||||
|
||||
func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.Discovery, error) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user