mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-23 22:19:55 +08:00
Merge branch 'openimsdk:pre-release-v3.8.4' into pre-release-v3.8.4
This commit is contained in:
commit
6f72b02be9
@ -31,7 +31,7 @@ joinGroupApplication:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: false
|
enable: true
|
||||||
title: joinGroupApplication title
|
title: joinGroupApplication title
|
||||||
desc: joinGroupApplication desc
|
desc: joinGroupApplication desc
|
||||||
ext: joinGroupApplication ext
|
ext: joinGroupApplication ext
|
||||||
@ -51,7 +51,7 @@ groupApplicationAccepted:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: false
|
enable: true
|
||||||
title: groupApplicationAccepted title
|
title: groupApplicationAccepted title
|
||||||
desc: groupApplicationAccepted desc
|
desc: groupApplicationAccepted desc
|
||||||
ext: groupApplicationAccepted ext
|
ext: groupApplicationAccepted ext
|
||||||
@ -61,7 +61,7 @@ groupApplicationRejected:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: false
|
enable: true
|
||||||
title: groupApplicationRejected title
|
title: groupApplicationRejected title
|
||||||
desc: groupApplicationRejected desc
|
desc: groupApplicationRejected desc
|
||||||
ext: groupApplicationRejected ext
|
ext: groupApplicationRejected ext
|
||||||
@ -198,7 +198,7 @@ friendApplicationAdded:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: false
|
enable: true
|
||||||
title: Somebody applies to add you as a friend
|
title: Somebody applies to add you as a friend
|
||||||
desc: Somebody applies to add you as a friend
|
desc: Somebody applies to add you as a friend
|
||||||
ext: Somebody applies to add you as a friend
|
ext: Somebody applies to add you as a friend
|
||||||
@ -228,7 +228,7 @@ friendAdded:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: We have become friends
|
title: We have become friends
|
||||||
desc: We have become friends
|
desc: We have become friends
|
||||||
ext: We have become friends
|
ext: We have become friends
|
||||||
@ -238,7 +238,7 @@ friendDeleted:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: deleted a friend
|
title: deleted a friend
|
||||||
desc: deleted a friend
|
desc: deleted a friend
|
||||||
ext: deleted a friend
|
ext: deleted a friend
|
||||||
@ -248,7 +248,7 @@ friendRemarkSet:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: Your friend's profile has been changed
|
title: Your friend's profile has been changed
|
||||||
desc: Your friend's profile has been changed
|
desc: Your friend's profile has been changed
|
||||||
ext: Your friend's profile has been changed
|
ext: Your friend's profile has been changed
|
||||||
@ -258,7 +258,7 @@ blackAdded:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: blocked a user
|
title: blocked a user
|
||||||
desc: blocked a user
|
desc: blocked a user
|
||||||
ext: blocked a user
|
ext: blocked a user
|
||||||
@ -268,7 +268,7 @@ blackDeleted:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: Remove a blocked user
|
title: Remove a blocked user
|
||||||
desc: Remove a blocked user
|
desc: Remove a blocked user
|
||||||
ext: Remove a blocked user
|
ext: Remove a blocked user
|
||||||
@ -278,7 +278,7 @@ friendInfoUpdated:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: friend info updated
|
title: friend info updated
|
||||||
desc: friend info updated
|
desc: friend info updated
|
||||||
ext: friend info updated
|
ext: friend info updated
|
||||||
@ -289,7 +289,7 @@ userInfoUpdated:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: userInfo updated
|
title: userInfo updated
|
||||||
desc: userInfo updated
|
desc: userInfo updated
|
||||||
ext: userInfo updated
|
ext: userInfo updated
|
||||||
@ -310,7 +310,7 @@ conversationChanged:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: conversation changed
|
title: conversation changed
|
||||||
desc: conversation changed
|
desc: conversation changed
|
||||||
ext: conversation changed
|
ext: conversation changed
|
||||||
@ -320,7 +320,7 @@ conversationSetPrivate:
|
|||||||
reliabilityLevel: 1
|
reliabilityLevel: 1
|
||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: false
|
||||||
title: burn after reading
|
title: burn after reading
|
||||||
desc: burn after reading
|
desc: burn after reading
|
||||||
ext: burn after reading
|
ext: burn after reading
|
@ -3,14 +3,7 @@ beforeSendSingleMsg:
|
|||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
failedContinue: true
|
failedContinue: true
|
||||||
# Only the contentType in allowedTypes will send the callback.
|
|
||||||
# Supports two formats: a single type or a range. The range is defined by the lower and upper bounds connected with a hyphen ("-").
|
|
||||||
# e.g. allowedTypes: [1, 100, 200-500, 600-700] means that only contentType within the range
|
|
||||||
# {1, 100} ∪ [200, 500] ∪ [600, 700] will be allowed through the filter.
|
|
||||||
# If not set, all contentType messages will through this filter.
|
|
||||||
allowedTypes: []
|
|
||||||
# Only the contentType not in deniedTypes will send the callback.
|
# Only the contentType not in deniedTypes will send the callback.
|
||||||
# Supports two formats, same as allowedTypes.
|
|
||||||
# If not set, all contentType messages will through this filter.
|
# If not set, all contentType messages will through this filter.
|
||||||
deniedTypes: []
|
deniedTypes: []
|
||||||
beforeUpdateUserInfoEx:
|
beforeUpdateUserInfoEx:
|
||||||
@ -23,31 +16,30 @@ afterUpdateUserInfoEx:
|
|||||||
afterSendSingleMsg:
|
afterSendSingleMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
# Only the senID/recvID specified in attentionIds will send the callback
|
# Only the recvID specified in attentionIds will send the callback
|
||||||
# if not set, all user messages will be callback
|
# if not set, all user messages will be callback
|
||||||
attentionIds: []
|
attentionIds: []
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
allowedTypes: []
|
|
||||||
deniedTypes: []
|
deniedTypes: []
|
||||||
beforeSendGroupMsg:
|
beforeSendGroupMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
failedContinue: true
|
failedContinue: true
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
allowedTypes: []
|
|
||||||
deniedTypes: []
|
deniedTypes: []
|
||||||
beforeMsgModify:
|
beforeMsgModify:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
failedContinue: true
|
failedContinue: true
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
allowedTypes: []
|
|
||||||
deniedTypes: []
|
deniedTypes: []
|
||||||
afterSendGroupMsg:
|
afterSendGroupMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
|
# Only the recvID specified in attentionIds will send the callback
|
||||||
|
# if not set, all user messages will be callback
|
||||||
|
attentionIds: []
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
allowedTypes: []
|
|
||||||
deniedTypes: []
|
deniedTypes: []
|
||||||
afterUserOnline:
|
afterUserOnline:
|
||||||
enable: false
|
enable: false
|
||||||
|
4
go.mod
4
go.mod
@ -12,8 +12,8 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.71
|
github.com/openimsdk/protocol v0.0.72-alpha.79
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.72
|
github.com/openimsdk/tools v0.0.50-alpha.74
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
|
8
go.sum
8
go.sum
@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70=
|
github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY=
|
github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
|
github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
@ -73,7 +73,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) {
|
|||||||
func (cm *ConfigManager) GetConfigList(c *gin.Context) {
|
func (cm *ConfigManager) GetConfigList(c *gin.Context) {
|
||||||
var resp apistruct.GetConfigListResp
|
var resp apistruct.GetConfigListResp
|
||||||
resp.ConfigNames = cm.config.GetConfigNames()
|
resp.ConfigNames = cm.config.GetConfigNames()
|
||||||
resp.Environment = runtimeenv.PrintRuntimeEnvironment()
|
resp.Environment = runtimeenv.RuntimeEnvironment()
|
||||||
resp.Version = version.Version
|
resp.Version = version.Version
|
||||||
|
|
||||||
apiresp.GinSuccess(c, resp)
|
apiresp.GinSuccess(c, resp)
|
||||||
|
@ -56,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()
|
config.RuntimeEnv = runtimeenv.RuntimeEnvironment()
|
||||||
|
|
||||||
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{
|
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{
|
||||||
config.Discovery.RpcService.MessageGateway,
|
config.Discovery.RpcService.MessageGateway,
|
||||||
|
@ -15,12 +15,19 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/go-playground/validator/v10"
|
"github.com/go-playground/validator/v10"
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
"google.golang.org/protobuf/reflect/protoreflect"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
@ -36,6 +43,39 @@ import (
|
|||||||
"github.com/openimsdk/tools/utils/timeutil"
|
"github.com/openimsdk/tools/utils/timeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
msgDataDescriptor []protoreflect.FieldDescriptor
|
||||||
|
msgDataDescriptorOnce sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
func getMsgDataDescriptor() []protoreflect.FieldDescriptor {
|
||||||
|
msgDataDescriptorOnce.Do(func() {
|
||||||
|
skip := make(map[string]struct{})
|
||||||
|
respFields := new(msg.SendMsgResp).ProtoReflect().Descriptor().Fields()
|
||||||
|
for i := 0; i < respFields.Len(); i++ {
|
||||||
|
field := respFields.Get(i)
|
||||||
|
if !field.HasJSONName() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
skip[field.JSONName()] = struct{}{}
|
||||||
|
}
|
||||||
|
fields := new(sdkws.MsgData).ProtoReflect().Descriptor().Fields()
|
||||||
|
num := fields.Len()
|
||||||
|
msgDataDescriptor = make([]protoreflect.FieldDescriptor, 0, num)
|
||||||
|
for i := 0; i < num; i++ {
|
||||||
|
field := fields.Get(i)
|
||||||
|
if !field.HasJSONName() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := skip[field.JSONName()]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
msgDataDescriptor = append(msgDataDescriptor, fields.Get(i))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return msgDataDescriptor
|
||||||
|
}
|
||||||
|
|
||||||
type MessageApi struct {
|
type MessageApi struct {
|
||||||
Client msg.MsgClient
|
Client msg.MsgClient
|
||||||
userClient *rpcli.UserClient
|
userClient *rpcli.UserClient
|
||||||
@ -171,6 +211,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
|
|||||||
data = apistruct.AtElem{}
|
data = apistruct.AtElem{}
|
||||||
case constant.Custom:
|
case constant.Custom:
|
||||||
data = apistruct.CustomElem{}
|
data = apistruct.CustomElem{}
|
||||||
|
case constant.MarkdownText:
|
||||||
|
data = apistruct.MarkdownTextElem{}
|
||||||
case constant.OANotification:
|
case constant.OANotification:
|
||||||
data = apistruct.OANotificationElem{}
|
data = apistruct.OANotificationElem{}
|
||||||
req.SessionType = constant.NotificationChatType
|
req.SessionType = constant.NotificationChatType
|
||||||
@ -190,6 +232,42 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
|
|||||||
return m.newUserSendMsgReq(c, &req), nil
|
return m.newUserSendMsgReq(c, &req), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MessageApi) getModifyFields(req, respModify *sdkws.MsgData) map[string]any {
|
||||||
|
if req == nil || respModify == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
fields := make(map[string]any)
|
||||||
|
reqProtoReflect := req.ProtoReflect()
|
||||||
|
respProtoReflect := respModify.ProtoReflect()
|
||||||
|
for _, descriptor := range getMsgDataDescriptor() {
|
||||||
|
reqValue := reqProtoReflect.Get(descriptor)
|
||||||
|
respValue := respProtoReflect.Get(descriptor)
|
||||||
|
if !reqValue.Equal(respValue) {
|
||||||
|
val := respValue.Interface()
|
||||||
|
name := descriptor.JSONName()
|
||||||
|
if name == "content" {
|
||||||
|
if bs, ok := val.([]byte); ok {
|
||||||
|
val = string(bs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fields[name] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(fields) == 0 {
|
||||||
|
fields = nil
|
||||||
|
}
|
||||||
|
return fields
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MessageApi) ginRespSendMsg(c *gin.Context, req *msg.SendMsgReq, resp *msg.SendMsgResp) {
|
||||||
|
res := m.getModifyFields(req.MsgData, resp.Modify)
|
||||||
|
resp.Modify = nil
|
||||||
|
apiresp.GinSuccess(c, &apistruct.SendMsgResp{
|
||||||
|
SendMsgResp: resp,
|
||||||
|
Modify: res,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework.
|
// SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework.
|
||||||
func (m *MessageApi) SendMessage(c *gin.Context) {
|
func (m *MessageApi) SendMessage(c *gin.Context) {
|
||||||
// Initialize a request struct for sending a message.
|
// Initialize a request struct for sending a message.
|
||||||
@ -243,7 +321,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Respond with a success message and the response payload.
|
// Respond with a success message and the response payload.
|
||||||
apiresp.GinSuccess(c, respPb)
|
m.ginRespSendMsg(c, sendMsgReq, respPb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
|
func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
|
||||||
@ -309,7 +387,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
|
|||||||
apiresp.GinError(c, err)
|
apiresp.GinError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
apiresp.GinSuccess(c, respPb)
|
m.ginRespSendMsg(c, &sendMsgReq, respPb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MessageApi) BatchSendMsg(c *gin.Context) {
|
func (m *MessageApi) BatchSendMsg(c *gin.Context) {
|
||||||
@ -363,11 +441,92 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
|
|||||||
ClientMsgID: rpcResp.ClientMsgID,
|
ClientMsgID: rpcResp.ClientMsgID,
|
||||||
SendTime: rpcResp.SendTime,
|
SendTime: rpcResp.SendTime,
|
||||||
RecvID: recvID,
|
RecvID: recvID,
|
||||||
|
Modify: m.getModifyFields(sendMsgReq.MsgData, rpcResp.Modify),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
apiresp.GinSuccess(c, resp)
|
apiresp.GinSuccess(c, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MessageApi) SendSimpleMessage(c *gin.Context) {
|
||||||
|
encodedKey, ok := c.GetQuery(webhook.Key)
|
||||||
|
if !ok {
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
decodedData, err := base64.StdEncoding.DecodeString(encodedKey)
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
req apistruct.SendSingleMsgReq
|
||||||
|
keyMsgData apistruct.KeyMsgData
|
||||||
|
|
||||||
|
sendID string
|
||||||
|
sessionType int32
|
||||||
|
recvID string
|
||||||
|
)
|
||||||
|
err = json.Unmarshal(decodedData, &keyMsgData)
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if keyMsgData.GroupID != "" {
|
||||||
|
sessionType = constant.ReadGroupChatType
|
||||||
|
sendID = req.SendID
|
||||||
|
} else {
|
||||||
|
sessionType = constant.SingleChatType
|
||||||
|
sendID = keyMsgData.RecvID
|
||||||
|
recvID = keyMsgData.SendID
|
||||||
|
}
|
||||||
|
// check param
|
||||||
|
if keyMsgData.SendID == "" {
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if sendID == "" {
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
msgData := &sdkws.MsgData{
|
||||||
|
SendID: sendID,
|
||||||
|
RecvID: recvID,
|
||||||
|
GroupID: keyMsgData.GroupID,
|
||||||
|
ClientMsgID: idutil.GetMsgIDByMD5(sendID),
|
||||||
|
SenderPlatformID: constant.AdminPlatformID,
|
||||||
|
SessionType: sessionType,
|
||||||
|
MsgFrom: constant.UserMsgType,
|
||||||
|
ContentType: constant.Text,
|
||||||
|
Content: []byte(req.Content),
|
||||||
|
OfflinePushInfo: req.OfflinePushInfo,
|
||||||
|
Ex: req.Ex,
|
||||||
|
}
|
||||||
|
|
||||||
|
sendReq := &msg.SendMsgReq{
|
||||||
|
MsgData: msgData,
|
||||||
|
}
|
||||||
|
|
||||||
|
respPb, err := m.Client.SendMsg(c, sendReq)
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var status = constant.MsgSendSuccessed
|
||||||
|
|
||||||
|
_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{
|
||||||
|
Status: int32(status),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.ginRespSendMsg(c, sendReq, respPb)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) {
|
func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) {
|
||||||
a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client)
|
a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client)
|
||||||
}
|
}
|
||||||
|
@ -223,15 +223,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
wg := errgroup.Group{}
|
wg := errgroup.Group{}
|
||||||
wg.SetLimit(concurrentRequest)
|
wg.SetLimit(concurrentRequest)
|
||||||
|
|
||||||
// Online push user online message to other node
|
// Online push user online message to other node
|
||||||
for _, v := range conns {
|
for _, v := range conns {
|
||||||
v := v
|
v := v
|
||||||
log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target())
|
log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn")
|
||||||
if v.Target() == ws.disCov.GetSelfConnTarget() {
|
if ws.disCov.IsSelfNode(v) {
|
||||||
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
|
log.ZDebug(ctx, "Filter out this node")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -242,7 +246,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
|
|||||||
PlatformID: int32(client.PlatformID), Token: client.token,
|
PlatformID: int32(client.PlatformID), Token: client.token,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
|
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -74,7 +74,7 @@ type Config struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, index int, config *Config) error {
|
func Start(ctx context.Context, index int, config *Config) error {
|
||||||
runTimeEnv := runtimeenv.PrintRuntimeEnvironment()
|
runTimeEnv := runtimeenv.RuntimeEnvironment()
|
||||||
|
|
||||||
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts",
|
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts",
|
||||||
config.MsgTransfer.Prometheus.Ports, "index", index)
|
config.MsgTransfer.Prometheus.Ports, "index", index)
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/go-redis/redis"
|
"github.com/go-redis/redis"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
|
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
@ -37,7 +38,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
@ -21,9 +21,9 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
pbmsg "github.com/openimsdk/protocol/msg"
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,12 +16,14 @@ package fcm
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
|
||||||
"github.com/openimsdk/tools/utils/httputil"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||||
|
"github.com/openimsdk/tools/utils/httputil"
|
||||||
|
|
||||||
firebase "firebase.google.com/go/v4"
|
firebase "firebase.google.com/go/v4"
|
||||||
"firebase.google.com/go/v4/messaging"
|
"firebase.google.com/go/v4/messaging"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -133,7 +135,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
|
|||||||
unreadCountSum, err := f.cache.GetUserBadgeUnreadCountSum(ctx, userID)
|
unreadCountSum, err := f.cache.GetUserBadgeUnreadCountSum(ctx, userID)
|
||||||
if err == nil && unreadCountSum != 0 {
|
if err == nil && unreadCountSum != 0 {
|
||||||
apns.Payload.Aps.Badge = &unreadCountSum
|
apns.Payload.Aps.Badge = &unreadCountSum
|
||||||
} else if err == redis.Nil || unreadCountSum == 0 {
|
} else if errors.Is(err, redis.Nil) || unreadCountSum == 0 {
|
||||||
zero := 1
|
zero := 1
|
||||||
apns.Payload.Aps.Badge = &zero
|
apns.Payload.Aps.Badge = &zero
|
||||||
} else {
|
} else {
|
||||||
|
@ -18,6 +18,16 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
incOne = datautil.ToPtr("+1")
|
||||||
|
addNum = "1"
|
||||||
|
defaultStrategy = strategy{
|
||||||
|
Default: 1,
|
||||||
|
}
|
||||||
|
msgCategory = "CATEGORY_MESSAGE"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Resp struct {
|
type Resp struct {
|
||||||
@ -59,6 +69,23 @@ type TaskResp struct {
|
|||||||
|
|
||||||
type Settings struct {
|
type Settings struct {
|
||||||
TTL *int64 `json:"ttl"`
|
TTL *int64 `json:"ttl"`
|
||||||
|
Strategy strategy `json:"strategy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type strategy struct {
|
||||||
|
Default int64 `json:"default"`
|
||||||
|
//IOS int64 `json:"ios"`
|
||||||
|
//St int64 `json:"st"`
|
||||||
|
//Hw int64 `json:"hw"`
|
||||||
|
//Ho int64 `json:"ho"`
|
||||||
|
//XM int64 `json:"xm"`
|
||||||
|
//XMG int64 `json:"xmg"`
|
||||||
|
//VV int64 `json:"vv"`
|
||||||
|
//Op int64 `json:"op"`
|
||||||
|
//OpG int64 `json:"opg"`
|
||||||
|
//MZ int64 `json:"mz"`
|
||||||
|
//HosHw int64 `json:"hoshw"`
|
||||||
|
//WX int64 `json:"wx"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Audience struct {
|
type Audience struct {
|
||||||
@ -112,6 +139,8 @@ type Notification struct {
|
|||||||
ChannelID string `json:"channelID"`
|
ChannelID string `json:"channelID"`
|
||||||
ChannelName string `json:"ChannelName"`
|
ChannelName string `json:"ChannelName"`
|
||||||
ClickType string `json:"click_type"`
|
ClickType string `json:"click_type"`
|
||||||
|
BadgeAddNum string `json:"badge_add_num"`
|
||||||
|
Category string `json:"category"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
@ -120,6 +149,7 @@ type Options struct {
|
|||||||
ChannelID string `json:"/message/android/notification/channel_id"`
|
ChannelID string `json:"/message/android/notification/channel_id"`
|
||||||
Sound string `json:"/message/android/notification/sound"`
|
Sound string `json:"/message/android/notification/sound"`
|
||||||
Importance string `json:"/message/android/notification/importance"`
|
Importance string `json:"/message/android/notification/importance"`
|
||||||
|
Category string `json:"/message/android/category"`
|
||||||
} `json:"HW"`
|
} `json:"HW"`
|
||||||
XM struct {
|
XM struct {
|
||||||
ChannelID string `json:"/extra.channel_id"`
|
ChannelID string `json:"/extra.channel_id"`
|
||||||
@ -140,6 +170,8 @@ func newPushReq(pushConf *config.Push, title, content string) PushReq {
|
|||||||
ClickType: "startapp",
|
ClickType: "startapp",
|
||||||
ChannelID: pushConf.GeTui.ChannelID,
|
ChannelID: pushConf.GeTui.ChannelID,
|
||||||
ChannelName: pushConf.GeTui.ChannelName,
|
ChannelName: pushConf.GeTui.ChannelName,
|
||||||
|
BadgeAddNum: addNum,
|
||||||
|
Category: msgCategory,
|
||||||
}}}
|
}}}
|
||||||
return pushReq
|
return pushReq
|
||||||
}
|
}
|
||||||
@ -156,6 +188,7 @@ func (pushReq *PushReq) setPushChannel(title string, body string) {
|
|||||||
notify := "notify"
|
notify := "notify"
|
||||||
pushReq.PushChannel.Ios.NotificationType = ¬ify
|
pushReq.PushChannel.Ios.NotificationType = ¬ify
|
||||||
pushReq.PushChannel.Ios.Aps.Sound = "default"
|
pushReq.PushChannel.Ios.Aps.Sound = "default"
|
||||||
|
pushReq.PushChannel.Ios.AutoBadge = incOne
|
||||||
pushReq.PushChannel.Ios.Aps.Alert = Alert{
|
pushReq.PushChannel.Ios.Aps.Alert = Alert{
|
||||||
Title: title,
|
Title: title,
|
||||||
Body: body,
|
Body: body,
|
||||||
@ -172,7 +205,8 @@ func (pushReq *PushReq) setPushChannel(title string, body string) {
|
|||||||
ChannelID string `json:"/message/android/notification/channel_id"`
|
ChannelID string `json:"/message/android/notification/channel_id"`
|
||||||
Sound string `json:"/message/android/notification/sound"`
|
Sound string `json:"/message/android/notification/sound"`
|
||||||
Importance string `json:"/message/android/notification/importance"`
|
Importance string `json:"/message/android/notification/importance"`
|
||||||
}{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL"},
|
Category string `json:"/message/android/category"`
|
||||||
|
}{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL", Category: "IM"},
|
||||||
XM: struct {
|
XM: struct {
|
||||||
ChannelID string `json:"/extra.channel_id"`
|
ChannelID string `json:"/extra.channel_id"`
|
||||||
}{ChannelID: "high_system"},
|
}{ChannelID: "high_system"},
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -70,7 +71,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) *Client {
|
|||||||
func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
|
func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
|
||||||
token, err := g.cache.GetGetuiToken(ctx)
|
token, err := g.cache.GetGetuiToken(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errs.Unwrap(err) == redis.Nil {
|
if errors.Is(err, redis.Nil) {
|
||||||
log.ZDebug(ctx, "getui token not exist in redis")
|
log.ZDebug(ctx, "getui token not exist in redis")
|
||||||
token, err = g.getTokenAndSave2Redis(ctx)
|
token, err = g.getTokenAndSave2Redis(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -144,7 +145,7 @@ func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expir
|
|||||||
func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) {
|
func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) {
|
||||||
respTask := TaskResp{}
|
respTask := TaskResp{}
|
||||||
ttl := int64(1000 * 60 * 5)
|
ttl := int64(1000 * 60 * 5)
|
||||||
pushReq.Settings = &Settings{TTL: &ttl}
|
pushReq.Settings = &Settings{TTL: &ttl, Strategy: defaultStrategy}
|
||||||
err := g.request(ctx, taskURL, pushReq, token, &respTask)
|
err := g.request(ctx, taskURL, pushReq, token, &respTask)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errs.Wrap(err)
|
return "", errs.Wrap(err)
|
||||||
@ -188,6 +189,7 @@ func (g *Client) postReturn(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
log.ZDebug(ctx, "postReturn", "url", url, "header", header, "input", input, "timeout", timeout, "output", output)
|
||||||
return output.parseError()
|
return output.parseError()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,7 +206,7 @@ func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) {
|
func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) {
|
||||||
pushReq.Settings = &Settings{TTL: &g.taskIDTTL}
|
pushReq.Settings = &Settings{TTL: &g.taskIDTTL, Strategy: defaultStrategy}
|
||||||
taskID, err = g.GetTaskID(ctx, token, pushReq)
|
taskID, err = g.GetTaskID(ctx, token, pushReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -7,12 +7,12 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
pbpush "github.com/openimsdk/protocol/push"
|
pbpush "github.com/openimsdk/protocol/push"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"github.com/openimsdk/tools/utils/jsonutil"
|
"github.com/openimsdk/tools/utils/jsonutil"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
@ -166,7 +166,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
|
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
|
||||||
var usersConns = make(map[*grpc.ClientConn][]string)
|
var usersConns = make(map[grpc.ClientConnInterface][]string)
|
||||||
for host, userIds := range usersHost {
|
for host, userIds := range usersHost {
|
||||||
tconn, _ := k.disCov.GetConn(ctx, host)
|
tconn, _ := k.disCov.GetConn(ctx, host)
|
||||||
usersConns[tconn] = userIds
|
usersConns[tconn] = userIds
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||||
@ -25,7 +26,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"github.com/openimsdk/tools/utils/jsonutil"
|
"github.com/openimsdk/tools/utils/jsonutil"
|
||||||
"github.com/openimsdk/tools/utils/timeutil"
|
"github.com/openimsdk/tools/utils/timeutil"
|
||||||
@ -208,7 +208,10 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
|
|||||||
if !isOfflinePush {
|
if !isOfflinePush {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if msg.ContentType == constant.SignalingNotification {
|
switch msg.ContentType {
|
||||||
|
case constant.RoomParticipantsConnectedNotification:
|
||||||
|
return false
|
||||||
|
case constant.RoomParticipantsDisconnectedNotification:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
@ -192,7 +192,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, v := range conns {
|
for _, v := range conns {
|
||||||
log.ZDebug(ctx, "forceKickOff", "conn", v.Target())
|
log.ZDebug(ctx, "forceKickOff", "userID", userID, "platformID", platformID)
|
||||||
client := msggateway.NewMsgGatewayClient(v)
|
client := msggateway.NewMsgGatewayClient(v)
|
||||||
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
|
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
|
||||||
_, err := client.KickUserOffline(ctx, kickReq)
|
_, err := client.KickUserOffline(ctx, kickReq)
|
||||||
|
@ -17,13 +17,14 @@ package group
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
||||||
"math/big"
|
"math/big"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -392,6 +393,8 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
|
|||||||
if err := g.PopulateGroupMember(ctx, groupMember); err != nil {
|
if err := g.PopulateGroupMember(ctx, groupMember); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
opUserID = mcontext.GetOpUserID(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := g.webhookBeforeInviteUserToGroup(ctx, &g.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue {
|
if err := g.webhookBeforeInviteUserToGroup(ctx, &g.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue {
|
||||||
@ -427,6 +430,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var groupMembers []*model.GroupMember
|
var groupMembers []*model.GroupMember
|
||||||
for _, userID := range req.InvitedUserIDs {
|
for _, userID := range req.InvitedUserIDs {
|
||||||
member := &model.GroupMember{
|
member := &model.GroupMember{
|
||||||
|
@ -16,7 +16,13 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
|
|
||||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -94,7 +100,7 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config
|
|||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||||
RecvID: msg.MsgData.RecvID,
|
RecvID: msg.MsgData.RecvID,
|
||||||
}
|
}
|
||||||
m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after)
|
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
||||||
@ -128,14 +134,15 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.
|
|||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
||||||
GroupID: msg.MsgData.GroupID,
|
GroupID: msg.MsgData.GroupID,
|
||||||
}
|
}
|
||||||
m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after)
|
|
||||||
|
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
|
||||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
if msg.MsgData.ContentType != constant.Text {
|
//if msg.MsgData.ContentType != constant.Text {
|
||||||
return nil
|
// return nil
|
||||||
}
|
//}
|
||||||
if !filterBeforeMsg(msg, before) {
|
if !filterBeforeMsg(msg, before) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -146,9 +153,14 @@ func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.B
|
|||||||
if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
|
if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if beforeMsgData != nil {
|
||||||
|
*beforeMsgData = proto.Clone(msg.MsgData).(*sdkws.MsgData)
|
||||||
|
}
|
||||||
if resp.Content != nil {
|
if resp.Content != nil {
|
||||||
msg.MsgData.Content = []byte(*resp.Content)
|
msg.MsgData.Content = []byte(*resp.Content)
|
||||||
|
if err := json.Unmarshal(msg.MsgData.Content, &struct{}{}); err != nil {
|
||||||
|
return errs.ErrArgs.WrapMsg("webhook msg modify content is not json", "content", string(msg.MsgData.Content))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo)
|
datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo)
|
||||||
datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID)
|
datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID)
|
||||||
@ -192,3 +204,15 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft
|
|||||||
}
|
}
|
||||||
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
|
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||||
|
keyMsgData := apistruct.KeyMsgData{
|
||||||
|
SendID: msg.SendID,
|
||||||
|
RecvID: msg.RecvID,
|
||||||
|
GroupID: msg.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return map[string]string{
|
||||||
|
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
package msg
|
package msg
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
pbchat "github.com/openimsdk/protocol/msg"
|
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
pbchat "github.com/openimsdk/protocol/msg"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -13,28 +15,50 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func filterAfterMsg(msg *pbchat.SendMsgReq, after *config.AfterConfig) bool {
|
func filterAfterMsg(msg *pbchat.SendMsgReq, after *config.AfterConfig) bool {
|
||||||
return filterMsg(msg, after.AttentionIds, after.AllowedTypes, after.DeniedTypes)
|
return filterMsg(msg, after.AttentionIds, after.DeniedTypes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterBeforeMsg(msg *pbchat.SendMsgReq, before *config.BeforeConfig) bool {
|
func filterBeforeMsg(msg *pbchat.SendMsgReq, before *config.BeforeConfig) bool {
|
||||||
return filterMsg(msg, nil, before.AllowedTypes, before.DeniedTypes)
|
return filterMsg(msg, nil, before.DeniedTypes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterMsg(msg *pbchat.SendMsgReq, attentionIds, allowedTypes, deniedTypes []string) bool {
|
func filterMsg(msg *pbchat.SendMsgReq, attentionIds []string, deniedTypes []int32) bool {
|
||||||
// According to the attentionIds configuration, only some users are sent
|
// According to the attentionIds configuration, only some users are sent
|
||||||
if len(attentionIds) != 0 && !datautil.Contains([]string{msg.MsgData.SendID, msg.MsgData.RecvID}, attentionIds...) {
|
if len(attentionIds) != 0 && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if len(allowedTypes) != 0 && !isInInterval(msg.MsgData.ContentType, allowedTypes) {
|
|
||||||
|
if defaultDeniedTypes(msg.MsgData.ContentType) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if len(deniedTypes) != 0 && isInInterval(msg.MsgData.ContentType, deniedTypes) {
|
|
||||||
|
if len(deniedTypes) != 0 && datautil.Contain(msg.MsgData.ContentType, deniedTypes...) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
//if len(allowedTypes) != 0 && !isInInterval(msg.MsgData.ContentType, allowedTypes) {
|
||||||
|
// return false
|
||||||
|
//}
|
||||||
|
//if len(deniedTypes) != 0 && isInInterval(msg.MsgData.ContentType, deniedTypes) {
|
||||||
|
// return false
|
||||||
|
//}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func isInInterval(contentType int32, interval []string) bool {
|
func defaultDeniedTypes(contentType int32) bool {
|
||||||
|
if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if contentType == constant.Typing {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// isInInterval if data is in interval
|
||||||
|
// Supports two formats: a single type or a range. The range is defined by the lower and upper bounds connected with a hyphen ("-")
|
||||||
|
// e.g. [1, 100, 200-500, 600-700] means that only data within the range
|
||||||
|
// {1, 100} ∪ [200, 500] ∪ [600, 700] will return true.
|
||||||
|
func isInInterval(data int32, interval []string) bool {
|
||||||
for _, v := range interval {
|
for _, v := range interval {
|
||||||
if strings.Contains(v, separator) {
|
if strings.Contains(v, separator) {
|
||||||
// is interval
|
// is interval
|
||||||
@ -50,7 +74,7 @@ func isInInterval(contentType int32, interval []string) bool {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if datautil.BetweenEq(int(contentType), bottom, top) {
|
if datautil.BetweenEq(int(data), bottom, top) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -58,7 +82,7 @@ func isInInterval(contentType int32, interval []string) bool {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if int(contentType) == iv {
|
if int(data) == iv {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,26 +29,39 @@ import (
|
|||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
||||||
if req.MsgData != nil {
|
if req.MsgData == nil {
|
||||||
|
return nil, errs.ErrArgs.WrapMsg("msgData is nil")
|
||||||
|
}
|
||||||
|
before := new(*sdkws.MsgData)
|
||||||
|
resp, err := m.sendMsg(ctx, req, before)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if *before != nil && proto.Equal(*before, req.MsgData) == false {
|
||||||
|
resp.Modify = req.MsgData
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) {
|
||||||
m.encapsulateMsgData(req.MsgData)
|
m.encapsulateMsgData(req.MsgData)
|
||||||
switch req.MsgData.SessionType {
|
switch req.MsgData.SessionType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
return m.sendMsgSingleChat(ctx, req)
|
return m.sendMsgSingleChat(ctx, req, before)
|
||||||
case constant.NotificationChatType:
|
case constant.NotificationChatType:
|
||||||
return m.sendMsgNotification(ctx, req)
|
return m.sendMsgNotification(ctx, req, before)
|
||||||
case constant.ReadGroupChatType:
|
case constant.ReadGroupChatType:
|
||||||
return m.sendMsgGroupChat(ctx, req)
|
return m.sendMsgGroupChat(ctx, req, before)
|
||||||
default:
|
default:
|
||||||
return nil, errs.ErrArgs.WrapMsg("unknown sessionType")
|
return nil, errs.ErrArgs.WrapMsg("unknown sessionType")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return nil, errs.ErrArgs.WrapMsg("msgData is nil")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
|
func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) {
|
||||||
if err = m.messageVerification(ctx, req); err != nil {
|
if err = m.messageVerification(ctx, req); err != nil {
|
||||||
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
|
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -57,7 +70,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq)
|
|||||||
if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil {
|
if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil {
|
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
|
err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
|
||||||
@ -139,7 +152,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
|
func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) {
|
||||||
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -151,7 +164,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
|
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) {
|
||||||
if err := m.messageVerification(ctx, req); err != nil {
|
if err := m.messageVerification(ctx, req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -171,12 +184,11 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
|||||||
}
|
}
|
||||||
if !isSend {
|
if !isSend {
|
||||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, nil
|
return nil, errs.ErrArgs.WrapMsg("message is not sent")
|
||||||
} else {
|
} else {
|
||||||
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil {
|
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
||||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -16,13 +16,15 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"github.com/openimsdk/tools/utils/encrypt"
|
"github.com/openimsdk/tools/utils/encrypt"
|
||||||
"github.com/openimsdk/tools/utils/timeutil"
|
"github.com/openimsdk/tools/utils/timeutil"
|
||||||
"math/rand"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
@ -62,6 +64,13 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
|
|||||||
if err := m.webhookBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig.BeforeSendSingleMsg, data); err != nil {
|
if err := m.webhookBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig.BeforeSendSingleMsg, data); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
u, err := m.UserLocalCache.GetUserInfo(ctx, data.MsgData.SendID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if authverify.CheckSystemAccount(ctx, u.AppMangerLevel) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
black, err := m.FriendLocalCache.IsBlack(ctx, data.MsgData.SendID, data.MsgData.RecvID)
|
black, err := m.FriendLocalCache.IsBlack(ctx, data.MsgData.SendID, data.MsgData.RecvID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -137,27 +146,9 @@ func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
|
|||||||
msg.SendTime = timeutil.GetCurrentTimestampByMill()
|
msg.SendTime = timeutil.GetCurrentTimestampByMill()
|
||||||
}
|
}
|
||||||
switch msg.ContentType {
|
switch msg.ContentType {
|
||||||
case constant.Text:
|
case constant.Text, constant.Picture, constant.Voice, constant.Video,
|
||||||
fallthrough
|
constant.File, constant.AtText, constant.Merger, constant.Card,
|
||||||
case constant.Picture:
|
constant.Location, constant.Custom, constant.Quote, constant.AdvancedText, constant.MarkdownText:
|
||||||
fallthrough
|
|
||||||
case constant.Voice:
|
|
||||||
fallthrough
|
|
||||||
case constant.Video:
|
|
||||||
fallthrough
|
|
||||||
case constant.File:
|
|
||||||
fallthrough
|
|
||||||
case constant.AtText:
|
|
||||||
fallthrough
|
|
||||||
case constant.Merger:
|
|
||||||
fallthrough
|
|
||||||
case constant.Card:
|
|
||||||
fallthrough
|
|
||||||
case constant.Location:
|
|
||||||
fallthrough
|
|
||||||
case constant.Custom:
|
|
||||||
fallthrough
|
|
||||||
case constant.Quote:
|
|
||||||
case constant.Revoke:
|
case constant.Revoke:
|
||||||
datautil.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
|
datautil.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
|
||||||
datautil.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
|
datautil.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
|
||||||
|
@ -19,11 +19,12 @@ import (
|
|||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
@ -37,7 +38,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) {
|
func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) {
|
||||||
limit := t.s3dataBase.PartLimit()
|
limit, err := t.s3dataBase.PartLimit()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return &third.PartLimitResp{
|
return &third.PartLimitResp{
|
||||||
MinPartSize: limit.MinPartSize,
|
MinPartSize: limit.MinPartSize,
|
||||||
MaxPartSize: limit.MaxPartSize,
|
MaxPartSize: limit.MaxPartSize,
|
||||||
|
@ -566,7 +566,7 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Convert users to response format
|
// Convert users to response format
|
||||||
resp := s.userModelToResp(users, req.Pagination)
|
resp := s.userModelToResp(users, req.Pagination, req.AppManagerLevel)
|
||||||
if resp.Total != 0 {
|
if resp.Total != 0 {
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@ -576,17 +576,24 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp = s.userModelToResp(users, req.Pagination)
|
resp = s.userModelToResp(users, req.Pagination, req.AppManagerLevel)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no keyword, find users with notification settings
|
// If no keyword, find users with notification settings
|
||||||
users, err = s.db.FindNotification(ctx, constant.AppNotificationAdmin)
|
if req.AppManagerLevel != nil {
|
||||||
|
users, err = s.db.FindNotification(ctx, int64(*req.AppManagerLevel))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
users, err = s.db.FindSystemAccount(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
resp := s.userModelToResp(users, req.Pagination)
|
resp := s.userModelToResp(users, req.Pagination, req.AppManagerLevel)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -625,11 +632,16 @@ func (s *userServer) genUserID() string {
|
|||||||
return string(data)
|
return string(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pagination.Pagination) *pbuser.SearchNotificationAccountResp {
|
func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pagination.Pagination, appManagerLevel *int32) *pbuser.SearchNotificationAccountResp {
|
||||||
accounts := make([]*pbuser.NotificationAccountInfo, 0)
|
accounts := make([]*pbuser.NotificationAccountInfo, 0)
|
||||||
var total int64
|
var total int64
|
||||||
for _, v := range users {
|
for _, v := range users {
|
||||||
if v.AppMangerLevel >= constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) {
|
if v.AppMangerLevel >= constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) {
|
||||||
|
if appManagerLevel != nil {
|
||||||
|
if v.AppMangerLevel != *appManagerLevel {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
temp := &pbuser.NotificationAccountInfo{
|
temp := &pbuser.NotificationAccountInfo{
|
||||||
UserID: v.UserID,
|
UserID: v.UserID,
|
||||||
FaceURL: v.FaceURL,
|
FaceURL: v.FaceURL,
|
||||||
|
@ -31,7 +31,7 @@ type CronTaskConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, conf *CronTaskConfig) error {
|
func Start(ctx context.Context, conf *CronTaskConfig) error {
|
||||||
conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
|
conf.runTimeEnv = runtimeenv.RuntimeEnvironment()
|
||||||
|
|
||||||
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
|
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
|
||||||
if conf.CronTask.RetainChatRecords < 1 {
|
if conf.CronTask.RetainChatRecords < 1 {
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package apistruct
|
package apistruct
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -111,6 +112,21 @@ type BatchSendMsgResp struct {
|
|||||||
FailedIDs []string `json:"failedUserIDs"`
|
FailedIDs []string `json:"failedUserIDs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SendSingleMsgReq defines the structure for sending a message to multiple recipients.
|
||||||
|
type SendSingleMsgReq struct {
|
||||||
|
// groupMsg should appoint sendID
|
||||||
|
SendID string `json:"sendID"`
|
||||||
|
Content string `json:"content" binding:"required"`
|
||||||
|
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
|
||||||
|
Ex string `json:"ex"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type KeyMsgData struct {
|
||||||
|
SendID string `json:"sendID"`
|
||||||
|
RecvID string `json:"recvID"`
|
||||||
|
GroupID string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
// SingleReturnResult encapsulates the result of a single message send attempt.
|
// SingleReturnResult encapsulates the result of a single message send attempt.
|
||||||
type SingleReturnResult struct {
|
type SingleReturnResult struct {
|
||||||
// ServerMsgID is the message identifier on the server-side.
|
// ServerMsgID is the message identifier on the server-side.
|
||||||
@ -124,4 +140,15 @@ type SingleReturnResult struct {
|
|||||||
|
|
||||||
// RecvID uniquely identifies the receiver of the message.
|
// RecvID uniquely identifies the receiver of the message.
|
||||||
RecvID string `json:"recvID"`
|
RecvID string `json:"recvID"`
|
||||||
|
|
||||||
|
// Modify fields modified via webhook.
|
||||||
|
Modify map[string]any `json:"modify,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SendMsgResp struct {
|
||||||
|
// SendMsgResp original response.
|
||||||
|
*pbmsg.SendMsgResp
|
||||||
|
|
||||||
|
// Modify fields modified via webhook.
|
||||||
|
Modify map[string]any `json:"modify,omitempty"`
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,15 @@ type TextElem struct {
|
|||||||
Content string `json:"content" validate:"required"`
|
Content string `json:"content" validate:"required"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MarkdownTextElem struct {
|
||||||
|
Content string `mapstructure:"content" validate:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type StreamMsgElem struct {
|
||||||
|
Type string `mapstructure:"type" validate:"required"`
|
||||||
|
Content string `mapstructure:"content" validate:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
type RevokeElem struct {
|
type RevokeElem struct {
|
||||||
RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"`
|
RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"`
|
||||||
}
|
}
|
||||||
|
@ -1 +0,0 @@
|
|||||||
package apistruct
|
|
@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
"github.com/golang-jwt/jwt/v4"
|
"github.com/golang-jwt/jwt/v4"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
)
|
)
|
||||||
@ -55,3 +56,7 @@ func CheckAdmin(ctx context.Context, imAdminUserID []string) error {
|
|||||||
func IsManagerUserID(opUserID string, imAdminUserID []string) bool {
|
func IsManagerUserID(opUserID string, imAdminUserID []string) bool {
|
||||||
return datautil.Contain(opUserID, imAdminUserID...)
|
return datautil.Contain(opUserID, imAdminUserID...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CheckSystemAccount(ctx context.Context, level int32) bool {
|
||||||
|
return level >= constant.AppAdmin
|
||||||
|
}
|
||||||
|
@ -86,7 +86,7 @@ func (r *RootCmd) initEtcd() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
disConfig := config.Discovery{}
|
disConfig := config.Discovery{}
|
||||||
env := runtimeenv.PrintRuntimeEnvironment()
|
env := runtimeenv.RuntimeEnvironment()
|
||||||
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename],
|
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename],
|
||||||
env, &disConfig)
|
env, &disConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -125,7 +125,7 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
runtimeEnv := runtimeenv.PrintRuntimeEnvironment()
|
runtimeEnv := runtimeenv.RuntimeEnvironment()
|
||||||
|
|
||||||
// Load common configuration file
|
// Load common configuration file
|
||||||
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
|
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
|
||||||
|
@ -18,9 +18,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/tools/db/mongoutil"
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
"github.com/openimsdk/tools/db/redisutil"
|
"github.com/openimsdk/tools/db/redisutil"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"github.com/openimsdk/tools/s3/aws"
|
"github.com/openimsdk/tools/s3/aws"
|
||||||
"github.com/openimsdk/tools/s3/cos"
|
"github.com/openimsdk/tools/s3/cos"
|
||||||
"github.com/openimsdk/tools/s3/kodo"
|
"github.com/openimsdk/tools/s3/kodo"
|
||||||
@ -364,19 +364,17 @@ type Redis struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type BeforeConfig struct {
|
type BeforeConfig struct {
|
||||||
Enable bool `mapstructure:"enable"`
|
Enable bool `yaml:"enable"`
|
||||||
Timeout int `mapstructure:"timeout"`
|
Timeout int `yaml:"timeout"`
|
||||||
FailedContinue bool `mapstructure:"failedContinue"`
|
FailedContinue bool `yaml:"failedContinue"`
|
||||||
AllowedTypes []string `mapstructure:"allowedTypes"`
|
DeniedTypes []int32 `yaml:"deniedTypes"`
|
||||||
DeniedTypes []string `mapstructure:"deniedTypes"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type AfterConfig struct {
|
type AfterConfig struct {
|
||||||
Enable bool `mapstructure:"enable"`
|
Enable bool `yaml:"enable"`
|
||||||
Timeout int `mapstructure:"timeout"`
|
Timeout int `yaml:"timeout"`
|
||||||
AttentionIds []string `mapstructure:"attentionIds"`
|
AttentionIds []string `yaml:"attentionIds"`
|
||||||
AllowedTypes []string `mapstructure:"allowedTypes"`
|
DeniedTypes []int32 `yaml:"deniedTypes"`
|
||||||
DeniedTypes []string `mapstructure:"deniedTypes"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Share struct {
|
type Share struct {
|
||||||
|
@ -70,7 +70,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
runTimeEnv := runtimeenv.PrintRuntimeEnvironment()
|
runTimeEnv := runtimeenv.RuntimeEnvironment()
|
||||||
|
|
||||||
if !autoSetPorts {
|
if !autoSetPorts {
|
||||||
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
|
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
|
||||||
@ -177,6 +177,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = client.Register(
|
err = client.Register(
|
||||||
|
ctx,
|
||||||
rpcRegisterName,
|
rpcRegisterName,
|
||||||
registerIP,
|
registerIP,
|
||||||
port,
|
port,
|
||||||
|
4
pkg/common/storage/cache/redis/lua_script.go
vendored
4
pkg/common/storage/cache/redis/lua_script.go
vendored
@ -2,7 +2,9 @@ package redis
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
@ -56,7 +58,7 @@ func callLua(ctx context.Context, rdb redis.Scripter, script *redis.Script, keys
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
v, err := r.Result()
|
v, err := r.Result()
|
||||||
if err == redis.Nil {
|
if errors.Is(err, redis.Nil) {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
return v, errs.WrapMsg(err, "call lua err", "scriptHash", script.Hash(), "keys", keys, "args", args)
|
return v, errs.WrapMsg(err, "call lua err", "scriptHash", script.Hash(), "keys", keys, "args", args)
|
||||||
|
@ -33,12 +33,12 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
pbmsg "github.com/openimsdk/protocol/msg"
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -9,12 +9,12 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
pbmsg "github.com/openimsdk/protocol/msg"
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,10 +19,10 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/protocol/push"
|
"github.com/openimsdk/protocol/push"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PushDatabase interface {
|
type PushDatabase interface {
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type S3Database interface {
|
type S3Database interface {
|
||||||
PartLimit() *s3.PartLimit
|
PartLimit() (*s3.PartLimit, error)
|
||||||
PartSize(ctx context.Context, size int64) (int64, error)
|
PartSize(ctx context.Context, size int64) (int64, error)
|
||||||
AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error)
|
AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error)
|
||||||
InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error)
|
InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error)
|
||||||
@ -65,7 +65,7 @@ func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) {
|
|||||||
return s.s3.PartSize(ctx, size)
|
return s.s3.PartSize(ctx, size)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *s3Database) PartLimit() *s3.PartLimit {
|
func (s *s3Database) PartLimit() (*s3.PartLimit, error) {
|
||||||
return s.s3.PartLimit()
|
return s.s3.PartLimit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/tools/db/pagination"
|
"github.com/openimsdk/tools/db/pagination"
|
||||||
"github.com/openimsdk/tools/db/tx"
|
"github.com/openimsdk/tools/db/tx"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
@ -37,8 +38,10 @@ type UserDatabase interface {
|
|||||||
Find(ctx context.Context, userIDs []string) (users []*model.User, err error)
|
Find(ctx context.Context, userIDs []string) (users []*model.User, err error)
|
||||||
// Find userInfo By Nickname
|
// Find userInfo By Nickname
|
||||||
FindByNickname(ctx context.Context, nickname string) (users []*model.User, err error)
|
FindByNickname(ctx context.Context, nickname string) (users []*model.User, err error)
|
||||||
// Find notificationAccounts
|
// FindNotification find system account by level
|
||||||
FindNotification(ctx context.Context, level int64) (users []*model.User, err error)
|
FindNotification(ctx context.Context, level int64) (users []*model.User, err error)
|
||||||
|
// FindSystemAccount find all system account
|
||||||
|
FindSystemAccount(ctx context.Context) (users []*model.User, err error)
|
||||||
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage
|
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage
|
||||||
Create(ctx context.Context, users []*model.User) (err error)
|
Create(ctx context.Context, users []*model.User) (err error)
|
||||||
// UpdateByMap update (zero value) external guarantee userID exists
|
// UpdateByMap update (zero value) external guarantee userID exists
|
||||||
@ -139,6 +142,10 @@ func (u *userDatabase) FindNotification(ctx context.Context, level int64) (users
|
|||||||
return u.userDB.TakeNotification(ctx, level)
|
return u.userDB.TakeNotification(ctx, level)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (u *userDatabase) FindSystemAccount(ctx context.Context) (users []*model.User, err error) {
|
||||||
|
return u.userDB.TakeGTEAppManagerLevel(ctx, constant.AppNotificationAdmin)
|
||||||
|
}
|
||||||
|
|
||||||
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage.
|
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage.
|
||||||
func (u *userDatabase) Create(ctx context.Context, users []*model.User) (err error) {
|
func (u *userDatabase) Create(ctx context.Context, users []*model.User) (err error) {
|
||||||
return u.tx.Transaction(ctx, func(ctx context.Context) error {
|
return u.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||||
|
@ -16,9 +16,10 @@ package mgo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/user"
|
"github.com/openimsdk/protocol/user"
|
||||||
"github.com/openimsdk/tools/db/mongoutil"
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
@ -71,6 +72,10 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo
|
|||||||
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": level})
|
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": level})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) {
|
||||||
|
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}})
|
||||||
|
}
|
||||||
|
|
||||||
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) {
|
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) {
|
||||||
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"nickname": nickname})
|
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"nickname": nickname})
|
||||||
}
|
}
|
||||||
|
@ -16,10 +16,11 @@ package database
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/protocol/user"
|
"github.com/openimsdk/protocol/user"
|
||||||
"github.com/openimsdk/tools/db/pagination"
|
"github.com/openimsdk/tools/db/pagination"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type User interface {
|
type User interface {
|
||||||
@ -28,6 +29,7 @@ type User interface {
|
|||||||
Find(ctx context.Context, userIDs []string) (users []*model.User, err error)
|
Find(ctx context.Context, userIDs []string) (users []*model.User, err error)
|
||||||
Take(ctx context.Context, userID string) (user *model.User, err error)
|
Take(ctx context.Context, userID string) (user *model.User, err error)
|
||||||
TakeNotification(ctx context.Context, level int64) (user []*model.User, err error)
|
TakeNotification(ctx context.Context, level int64) (user []*model.User, err error)
|
||||||
|
TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error)
|
||||||
TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error)
|
TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error)
|
||||||
Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*model.User, err error)
|
Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*model.User, err error)
|
||||||
PageFindUser(ctx context.Context, level1 int64, level2 int64, pagination pagination.Pagination) (count int64, users []*model.User, err error)
|
PageFindUser(ctx context.Context, level1 int64, level2 int64, pagination pagination.Pagination) (count int64, users []*model.User, err error)
|
||||||
|
33
pkg/common/storage/kafka/config.go
Normal file
33
pkg/common/storage/kafka/config.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
// Copyright © 2024 OpenIM open source community. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
type TLSConfig struct {
|
||||||
|
EnableTLS bool `yaml:"enableTLS"`
|
||||||
|
CACrt string `yaml:"caCrt"`
|
||||||
|
ClientCrt string `yaml:"clientCrt"`
|
||||||
|
ClientKey string `yaml:"clientKey"`
|
||||||
|
ClientKeyPwd string `yaml:"clientKeyPwd"`
|
||||||
|
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Username string `yaml:"username"`
|
||||||
|
Password string `yaml:"password"`
|
||||||
|
ProducerAck string `yaml:"producerAck"`
|
||||||
|
CompressType string `yaml:"compressType"`
|
||||||
|
Addr []string `yaml:"addr"`
|
||||||
|
TLS TLSConfig `yaml:"tls"`
|
||||||
|
}
|
68
pkg/common/storage/kafka/consumer_group.go
Normal file
68
pkg/common/storage/kafka/consumer_group.go
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
// Copyright © 2023 OpenIM. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MConsumerGroup struct {
|
||||||
|
sarama.ConsumerGroup
|
||||||
|
groupID string
|
||||||
|
topics []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) {
|
||||||
|
config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
group, err := NewConsumerGroup(config, conf.Addr, groupID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &MConsumerGroup{
|
||||||
|
ConsumerGroup: group,
|
||||||
|
groupID: groupID,
|
||||||
|
topics: topics,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
|
||||||
|
return GetContextWithMQHeader(cMsg.Headers)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
|
||||||
|
for {
|
||||||
|
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
|
||||||
|
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MConsumerGroup) Close() error {
|
||||||
|
return mc.ConsumerGroup.Close()
|
||||||
|
}
|
82
pkg/common/storage/kafka/producer.go
Normal file
82
pkg/common/storage/kafka/producer.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
// Copyright © 2023 OpenIM. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Producer represents a Kafka producer.
|
||||||
|
type Producer struct {
|
||||||
|
addr []string
|
||||||
|
topic string
|
||||||
|
config *sarama.Config
|
||||||
|
producer sarama.SyncProducer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
|
||||||
|
producer, err := NewProducer(config, addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Producer{
|
||||||
|
addr: addr,
|
||||||
|
topic: topic,
|
||||||
|
config: config,
|
||||||
|
producer: producer,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage sends a message to the Kafka topic configured in the Producer.
|
||||||
|
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
||||||
|
// Marshal the protobuf message
|
||||||
|
bMsg, err := proto.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
|
||||||
|
}
|
||||||
|
if len(bMsg) == 0 {
|
||||||
|
return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare Kafka message
|
||||||
|
kMsg := &sarama.ProducerMessage{
|
||||||
|
Topic: p.topic,
|
||||||
|
Key: sarama.StringEncoder(key),
|
||||||
|
Value: sarama.ByteEncoder(bMsg),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate message key and value
|
||||||
|
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||||
|
return 0, 0, errs.Wrap(errEmptyMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attach context metadata as headers
|
||||||
|
header, err := GetMQHeaderWithContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
kMsg.Headers = header
|
||||||
|
|
||||||
|
// Send the message
|
||||||
|
partition, offset, err := p.producer.SendMessage(kMsg)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error")
|
||||||
|
}
|
||||||
|
|
||||||
|
return partition, offset, nil
|
||||||
|
}
|
85
pkg/common/storage/kafka/sarama.go
Normal file
85
pkg/common/storage/kafka/sarama.go
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) {
|
||||||
|
kfk := sarama.NewConfig()
|
||||||
|
kfk.Version = sarama.V2_0_0_0
|
||||||
|
kfk.Consumer.Offsets.Initial = initial
|
||||||
|
kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable
|
||||||
|
kfk.Consumer.Return.Errors = false
|
||||||
|
if conf.Username != "" || conf.Password != "" {
|
||||||
|
kfk.Net.SASL.Enable = true
|
||||||
|
kfk.Net.SASL.User = conf.Username
|
||||||
|
kfk.Net.SASL.Password = conf.Password
|
||||||
|
}
|
||||||
|
if conf.TLS.EnableTLS {
|
||||||
|
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kfk.Net.TLS.Config = tls
|
||||||
|
kfk.Net.TLS.Enable = true
|
||||||
|
}
|
||||||
|
return kfk, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) {
|
||||||
|
cg, err := sarama.NewConsumerGroup(addr, groupID, conf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf)
|
||||||
|
}
|
||||||
|
return cg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildProducerConfig(conf Config) (*sarama.Config, error) {
|
||||||
|
kfk := sarama.NewConfig()
|
||||||
|
kfk.Producer.Return.Successes = true
|
||||||
|
kfk.Producer.Return.Errors = true
|
||||||
|
kfk.Producer.Partitioner = sarama.NewHashPartitioner
|
||||||
|
if conf.Username != "" || conf.Password != "" {
|
||||||
|
kfk.Net.SASL.Enable = true
|
||||||
|
kfk.Net.SASL.User = conf.Username
|
||||||
|
kfk.Net.SASL.Password = conf.Password
|
||||||
|
}
|
||||||
|
switch strings.ToLower(conf.ProducerAck) {
|
||||||
|
case "no_response":
|
||||||
|
kfk.Producer.RequiredAcks = sarama.NoResponse
|
||||||
|
case "wait_for_local":
|
||||||
|
kfk.Producer.RequiredAcks = sarama.WaitForLocal
|
||||||
|
case "wait_for_all":
|
||||||
|
kfk.Producer.RequiredAcks = sarama.WaitForAll
|
||||||
|
default:
|
||||||
|
kfk.Producer.RequiredAcks = sarama.WaitForAll
|
||||||
|
}
|
||||||
|
if conf.CompressType == "" {
|
||||||
|
kfk.Producer.Compression = sarama.CompressionNone
|
||||||
|
} else {
|
||||||
|
if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if conf.TLS.EnableTLS {
|
||||||
|
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kfk.Net.TLS.Config = tls
|
||||||
|
kfk.Net.TLS.Enable = true
|
||||||
|
}
|
||||||
|
return kfk, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) {
|
||||||
|
producer, err := sarama.NewSyncProducer(addr, conf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf)
|
||||||
|
}
|
||||||
|
return producer, nil
|
||||||
|
}
|
83
pkg/common/storage/kafka/tls.go
Normal file
83
pkg/common/storage/kafka/tls.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
// Copyright © 2024 OpenIM open source community. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"encoding/pem"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// decryptPEM decrypts a PEM block using a password.
|
||||||
|
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
|
||||||
|
if len(passphrase) == 0 {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
b, _ := pem.Decode(data)
|
||||||
|
d, err := x509.DecryptPEMBlock(b, passphrase)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "DecryptPEMBlock failed")
|
||||||
|
}
|
||||||
|
return pem.EncodeToMemory(&pem.Block{
|
||||||
|
Type: b.Type,
|
||||||
|
Bytes: d,
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "ReadFile failed", "path", path)
|
||||||
|
}
|
||||||
|
return decryptPEM(data, pwd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTLSConfig setup the TLS config from general config file.
|
||||||
|
func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) {
|
||||||
|
var tlsConfig tls.Config
|
||||||
|
if clientCertFile != "" && clientKeyFile != "" {
|
||||||
|
certPEMBlock, err := os.ReadFile(clientCertFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile)
|
||||||
|
}
|
||||||
|
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "X509KeyPair failed")
|
||||||
|
}
|
||||||
|
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||||
|
}
|
||||||
|
|
||||||
|
if caCertFile != "" {
|
||||||
|
caCert, err := os.ReadFile(caCertFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile)
|
||||||
|
}
|
||||||
|
caCertPool := x509.NewCertPool()
|
||||||
|
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
||||||
|
return nil, errs.New("AppendCertsFromPEM failed")
|
||||||
|
}
|
||||||
|
tlsConfig.RootCAs = caCertPool
|
||||||
|
}
|
||||||
|
tlsConfig.InsecureSkipVerify = insecureSkipVerify
|
||||||
|
return &tlsConfig, nil
|
||||||
|
}
|
34
pkg/common/storage/kafka/util.go
Normal file
34
pkg/common/storage/kafka/util.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errEmptyMsg = errors.New("kafka binary msg is empty")
|
||||||
|
|
||||||
|
// GetMQHeaderWithContext extracts message queue headers from the context.
|
||||||
|
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
||||||
|
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return []sarama.RecordHeader{
|
||||||
|
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
|
||||||
|
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
|
||||||
|
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
||||||
|
{Key: []byte(constant.ConnID), Value: []byte(connID)},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContextWithMQHeader creates a context from message queue headers.
|
||||||
|
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||||
|
var values []string
|
||||||
|
for _, recordHeader := range header {
|
||||||
|
values = append(values, string(recordHeader.Value))
|
||||||
|
}
|
||||||
|
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
|
||||||
|
}
|
79
pkg/common/storage/kafka/verify.go
Normal file
79
pkg/common/storage/kafka/verify.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
// Copyright © 2024 OpenIM open source community. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CheckTopics(ctx context.Context, conf *Config, topics []string) error {
|
||||||
|
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cli, err := sarama.NewClient(conf.Addr, kfk)
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
existingTopics, err := cli.Topics()
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "Failed to list topics")
|
||||||
|
}
|
||||||
|
|
||||||
|
existingTopicsMap := make(map[string]bool)
|
||||||
|
for _, t := range existingTopics {
|
||||||
|
existingTopicsMap[t] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, topic := range topics {
|
||||||
|
if !existingTopicsMap[topic] {
|
||||||
|
return errs.New("topic not exist", "topic", topic).Wrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CheckHealth(ctx context.Context, conf *Config) error {
|
||||||
|
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cli, err := sarama.NewClient(conf.Addr, kfk)
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
// Get broker list
|
||||||
|
brokers := cli.Brokers()
|
||||||
|
if len(brokers) == 0 {
|
||||||
|
return errs.New("no brokers found").Wrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if all brokers are reachable
|
||||||
|
for _, broker := range brokers {
|
||||||
|
if err := broker.Open(kfk); err != nil {
|
||||||
|
return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -17,6 +17,9 @@ package webhook
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
@ -25,7 +28,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/mq/memamq"
|
"github.com/openimsdk/tools/mq/memamq"
|
||||||
"github.com/openimsdk/tools/utils/httputil"
|
"github.com/openimsdk/tools/utils/httputil"
|
||||||
"net/http"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
@ -37,6 +39,8 @@ type Client struct {
|
|||||||
const (
|
const (
|
||||||
webhookWorkerCount = 2
|
webhookWorkerCount = 2
|
||||||
webhookBufferSize = 100
|
webhookBufferSize = 100
|
||||||
|
|
||||||
|
Key = "key"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewWebhookClient(url string, options ...*memamq.MemoryQueue) *Client {
|
func NewWebhookClient(url string, options ...*memamq.MemoryQueue) *Client {
|
||||||
@ -66,6 +70,12 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) AsyncPostWithQuery(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig, queryParams map[string]string) {
|
||||||
|
if after.Enable {
|
||||||
|
c.queue.Push(func() { c.postWithQuery(ctx, command, req, resp, after.Timeout, queryParams) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error {
|
func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error {
|
||||||
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
|
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
|
||||||
fullURL := c.url + "/" + command
|
fullURL := c.url + "/" + command
|
||||||
@ -84,3 +94,41 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou
|
|||||||
log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
|
log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) postWithQuery(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int, queryParams map[string]string) error {
|
||||||
|
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
|
||||||
|
fullURL := c.url + "/" + command
|
||||||
|
|
||||||
|
parsedURL, err := url.Parse(fullURL)
|
||||||
|
if err != nil {
|
||||||
|
return servererrs.ErrNetwork.WrapMsg(err.Error(), "failed to parse URL", fullURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := parsedURL.Query()
|
||||||
|
|
||||||
|
operationID, _ := ctx.Value(constant.OperationID).(string)
|
||||||
|
|
||||||
|
for key, value := range queryParams {
|
||||||
|
query.Set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedURL.RawQuery = query.Encode()
|
||||||
|
|
||||||
|
fullURL = parsedURL.String()
|
||||||
|
log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout)
|
||||||
|
|
||||||
|
b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout)
|
||||||
|
if err != nil {
|
||||||
|
return servererrs.ErrNetwork.WrapMsg(err.Error(), "post url", fullURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = json.Unmarshal(b, output); err != nil {
|
||||||
|
return servererrs.ErrData.WithDetail(err.Error() + " response format error")
|
||||||
|
}
|
||||||
|
if err := output.Parse(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
65
test/webhook/msgmodify/main.go
Normal file
65
test/webhook/msgmodify/main.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
g := gin.Default()
|
||||||
|
g.POST("/callbackExample/callbackBeforeMsgModifyCommand", toGin(handlerMsg))
|
||||||
|
if err := g.Run(":10006"); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func toGin[R any](fn func(c *gin.Context, req *R)) gin.HandlerFunc {
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
body, err := io.ReadAll(c.Request.Body)
|
||||||
|
if err != nil {
|
||||||
|
c.String(http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body)
|
||||||
|
var req R
|
||||||
|
if err := json.Unmarshal(body, &req); err != nil {
|
||||||
|
c.String(http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fn(c, &req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handlerMsg(c *gin.Context, req *cbapi.CallbackMsgModifyCommandReq) {
|
||||||
|
var resp cbapi.CallbackMsgModifyCommandResp
|
||||||
|
if req.ContentType != constant.Text {
|
||||||
|
c.JSON(http.StatusOK, &resp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var textElem struct {
|
||||||
|
Content string `json:"content"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal([]byte(req.Content), &textElem); err != nil {
|
||||||
|
c.String(http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const word = "xxx"
|
||||||
|
if strings.Contains(textElem.Content, word) {
|
||||||
|
textElem.Content = strings.ReplaceAll(textElem.Content, word, strings.Repeat("*", len(word)))
|
||||||
|
content, err := json.Marshal(&textElem)
|
||||||
|
if err != nil {
|
||||||
|
c.String(http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tmp := string(content)
|
||||||
|
resp.Content = &tmp
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusOK, &resp)
|
||||||
|
}
|
@ -25,11 +25,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/tools/db/mongoutil"
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
"github.com/openimsdk/tools/db/redisutil"
|
"github.com/openimsdk/tools/db/redisutil"
|
||||||
"github.com/openimsdk/tools/discovery/etcd"
|
"github.com/openimsdk/tools/discovery/etcd"
|
||||||
"github.com/openimsdk/tools/discovery/zookeeper"
|
"github.com/openimsdk/tools/discovery/zookeeper"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
|
||||||
"github.com/openimsdk/tools/s3/minio"
|
"github.com/openimsdk/tools/s3/minio"
|
||||||
"github.com/openimsdk/tools/system/program"
|
"github.com/openimsdk/tools/system/program"
|
||||||
"github.com/openimsdk/tools/utils/runtimeenv"
|
"github.com/openimsdk/tools/utils/runtimeenv"
|
||||||
@ -84,7 +84,7 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka,
|
|||||||
discovery = &config.Discovery{}
|
discovery = &config.Discovery{}
|
||||||
thirdConfig = &config.Third{}
|
thirdConfig = &config.Third{}
|
||||||
)
|
)
|
||||||
runtimeEnv := runtimeenv.PrintRuntimeEnvironment()
|
runtimeEnv := runtimeenv.RuntimeEnvironment()
|
||||||
|
|
||||||
err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig)
|
err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -43,7 +43,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func readConfig[T any](dir string, name string) (*T, error) {
|
func readConfig[T any](dir string, name string) (*T, error) {
|
||||||
if runtimeenv.PrintRuntimeEnvironment() == config.KUBERNETES {
|
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
|
||||||
dir = os.Getenv(config.MountConfigFilePath)
|
dir = os.Getenv(config.MountConfigFilePath)
|
||||||
}
|
}
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user