mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
feat: implement createConversations webhook function.
This commit is contained in:
parent
268e1df168
commit
9d79f05d7e
@ -181,3 +181,19 @@ afterImportFriends:
|
|||||||
afterRemoveBlack:
|
afterRemoveBlack:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
|
beforeCreateSingleChatConversations:
|
||||||
|
enable: false
|
||||||
|
timeout: 5
|
||||||
|
failedContinue: false
|
||||||
|
afterCreateSingleChatConversations:
|
||||||
|
enable: false
|
||||||
|
timeout: 5
|
||||||
|
failedContinue: false
|
||||||
|
beforeCreateGroupChatConversations:
|
||||||
|
enable: false
|
||||||
|
timeout: 5
|
||||||
|
failedContinue: false
|
||||||
|
afterCreateGroupChatConversations:
|
||||||
|
enable: false
|
||||||
|
timeout: 5
|
||||||
|
failedContinue: false
|
||||||
|
80
internal/rpc/conversation/callback.go
Normal file
80
internal/rpc/conversation/callback.go
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
package conversation
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *conversationServer) webhookBeforeCreateSingleChatConversations(ctx context.Context, before *config.BeforeConfig, req *pbconversation.CreateSingleChatConversationsReq) error {
|
||||||
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
|
cbReq := &callbackstruct.CallbackBeforeCreateSingleChatConversationsReq{
|
||||||
|
CallbackCommand: callbackstruct.CallbackBeforeCreateSingleChatConversationsCommand,
|
||||||
|
RecvID: req.RecvID,
|
||||||
|
SendID: req.SendID,
|
||||||
|
ConversationID: req.ConversationID,
|
||||||
|
ConversationType: req.ConversationType,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &callbackstruct.CallbackBeforeCreateSingleChatConversationsResp{}
|
||||||
|
|
||||||
|
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
datautil.NotNilReplace(&req.RecvID, resp.RecvID)
|
||||||
|
datautil.NotNilReplace(&req.SendID, resp.SendID)
|
||||||
|
datautil.NotNilReplace(&req.ConversationID, resp.ConversationID)
|
||||||
|
datautil.NotNilReplace(&req.ConversationType, resp.ConversationType)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) webhookAfterCreateSingleChatConversations(ctx context.Context, after *config.AfterConfig, req *pbconversation.CreateSingleChatConversationsReq) error {
|
||||||
|
cbReq := &callbackstruct.CallbackAfterCreateSingleChatConversationsReq{
|
||||||
|
CallbackCommand: callbackstruct.CallbackAfterCreateSingleChatConversationsCommand,
|
||||||
|
RecvID: req.RecvID,
|
||||||
|
SendID: req.SendID,
|
||||||
|
ConversationID: req.ConversationID,
|
||||||
|
ConversationType: req.ConversationType,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateSingleChatConversationsResp{}, after)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (c *conversationServer) webhookBeforeCreateGroupChatConversations(ctx context.Context, before *config.BeforeConfig, req *pbconversation.CreateGroupChatConversationsReq) error {
|
||||||
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
|
cbReq := &callbackstruct.CallbackBeforeCreateGroupChatConversationsReq{
|
||||||
|
CallbackCommand: callbackstruct.CallbackBeforeCreateGroupChatConversationsCommand,
|
||||||
|
UserIDs: req.UserIDs,
|
||||||
|
GroupID: req.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &callbackstruct.CallbackBeforeCreateGroupChatConversationsResp{}
|
||||||
|
|
||||||
|
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
datautil.NotNilReplace(&req.UserIDs, resp.UserIDs)
|
||||||
|
datautil.NotNilReplace(&req.GroupID, resp.GroupID)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) webhookAfterCreateGroupChatConversations(ctx context.Context, after *config.AfterConfig, req *pbconversation.CreateGroupChatConversationsReq) error {
|
||||||
|
cbReq := &callbackstruct.CallbackAfterCreateGroupChatConversationsReq{
|
||||||
|
CallbackCommand: callbackstruct.CallbackAfterCreateGroupChatConversationsCommand,
|
||||||
|
UserIDs: req.UserIDs,
|
||||||
|
GroupID: req.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupChatConversationsResp{}, after)
|
||||||
|
return nil
|
||||||
|
}
|
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
@ -49,6 +50,7 @@ type conversationServer struct {
|
|||||||
conversationNotificationSender *ConversationNotificationSender
|
conversationNotificationSender *ConversationNotificationSender
|
||||||
config *Config
|
config *Config
|
||||||
|
|
||||||
|
webhookClient *webhook.Client
|
||||||
userClient *rpcli.UserClient
|
userClient *rpcli.UserClient
|
||||||
msgClient *rpcli.MsgClient
|
msgClient *rpcli.MsgClient
|
||||||
groupClient *rpcli.GroupClient
|
groupClient *rpcli.GroupClient
|
||||||
@ -60,6 +62,7 @@ type Config struct {
|
|||||||
MongodbConfig config.Mongo
|
MongodbConfig config.Mongo
|
||||||
NotificationConfig config.Notification
|
NotificationConfig config.Notification
|
||||||
Share config.Share
|
Share config.Share
|
||||||
|
WebhooksConfig config.Webhooks
|
||||||
LocalCacheConfig config.LocalCache
|
LocalCacheConfig config.LocalCache
|
||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
}
|
}
|
||||||
@ -90,16 +93,25 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
msgClient := rpcli.NewMsgClient(msgConn)
|
msgClient := rpcli.NewMsgClient(msgConn)
|
||||||
localcache.InitLocalCache(&config.LocalCacheConfig)
|
|
||||||
pbconversation.RegisterConversationServer(server, &conversationServer{
|
cs := conversationServer{
|
||||||
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
|
config: config,
|
||||||
conversationDatabase: controller.NewConversationDatabase(conversationDB,
|
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||||
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()),
|
|
||||||
userClient: rpcli.NewUserClient(userConn),
|
userClient: rpcli.NewUserClient(userConn),
|
||||||
groupClient: rpcli.NewGroupClient(groupConn),
|
groupClient: rpcli.NewGroupClient(groupConn),
|
||||||
msgClient: msgClient,
|
msgClient: msgClient,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
cs.conversationNotificationSender = NewConversationNotificationSender(&config.NotificationConfig, msgClient)
|
||||||
|
cs.conversationDatabase = controller.NewConversationDatabase(
|
||||||
|
conversationDB,
|
||||||
|
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB),
|
||||||
|
mgocli.GetTx())
|
||||||
|
|
||||||
|
localcache.InitLocalCache(&config.LocalCacheConfig)
|
||||||
|
pbconversation.RegisterConversationServer(server, &cs)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,6 +338,10 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req
|
|||||||
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
||||||
req *pbconversation.CreateSingleChatConversationsReq,
|
req *pbconversation.CreateSingleChatConversationsReq,
|
||||||
) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
||||||
|
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, req); err != nil && err != servererrs.ErrCallbackContinue {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
switch req.ConversationType {
|
switch req.ConversationType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
var conversation dbModel.Conversation
|
var conversation dbModel.Conversation
|
||||||
@ -357,10 +373,23 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reqCallbackAfter := &pbconversation.CreateSingleChatConversationsReq{
|
||||||
|
RecvID: req.RecvID,
|
||||||
|
SendID: req.SendID,
|
||||||
|
ConversationID: req.ConversationID,
|
||||||
|
ConversationType: req.ConversationType,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, reqCallbackAfter)
|
||||||
|
|
||||||
return &pbconversation.CreateSingleChatConversationsResp{}, nil
|
return &pbconversation.CreateSingleChatConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) {
|
func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) {
|
||||||
|
if err := c.webhookBeforeCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateGroupChatConversations, req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs)
|
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -369,6 +398,13 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
|
|||||||
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
|
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reqCallBackAfter := &pbconversation.CreateGroupChatConversationsReq{
|
||||||
|
UserIDs: req.UserIDs,
|
||||||
|
GroupID: req.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, reqCallBackAfter)
|
||||||
return &pbconversation.CreateGroupChatConversationsResp{}, nil
|
return &pbconversation.CreateGroupChatConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,4 +62,8 @@ const (
|
|||||||
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
|
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
|
||||||
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
|
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
|
||||||
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
|
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
|
||||||
|
CallbackBeforeCreateSingleChatConversationsCommand = "callbackBeforeCreateSingleChatConversationsCommand"
|
||||||
|
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
|
||||||
|
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
|
||||||
|
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
|
||||||
)
|
)
|
||||||
|
51
pkg/callbackstruct/conversation.go
Normal file
51
pkg/callbackstruct/conversation.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package callbackstruct
|
||||||
|
|
||||||
|
type CallbackBeforeCreateSingleChatConversationsReq struct {
|
||||||
|
CallbackCommand `json:"callbackCommand"`
|
||||||
|
RecvID string `json:"recvID"`
|
||||||
|
SendID string `json:"sendID"`
|
||||||
|
ConversationID string `json:"conversationID"`
|
||||||
|
ConversationType int32 `json:"conversationType"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackBeforeCreateSingleChatConversationsResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
RecvID *string `json:"recvID"`
|
||||||
|
SendID *string `json:"sendID"`
|
||||||
|
ConversationID *string `json:"conversationID"`
|
||||||
|
ConversationType *int32 `json:"conversationType"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterCreateSingleChatConversationsReq struct {
|
||||||
|
CallbackCommand `json:"callbackCommand"`
|
||||||
|
RecvID string `json:"recvID"`
|
||||||
|
SendID string `json:"sendID"`
|
||||||
|
ConversationID string `json:"conversationID"`
|
||||||
|
ConversationType int32 `json:"conversationType"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterCreateSingleChatConversationsResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackBeforeCreateGroupChatConversationsReq struct {
|
||||||
|
CallbackCommand `json:"callbackCommand"`
|
||||||
|
UserIDs []string `json:"userIDs"`
|
||||||
|
GroupID string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackBeforeCreateGroupChatConversationsResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
UserIDs *[]string `json:"userIDs"`
|
||||||
|
GroupID *string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterCreateGroupChatConversationsReq struct {
|
||||||
|
CallbackCommand `json:"callbackCommand"`
|
||||||
|
UserIDs []string `json:"userIDs"`
|
||||||
|
GroupID string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterCreateGroupChatConversationsResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
@ -435,6 +435,10 @@ type Webhooks struct {
|
|||||||
BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"`
|
BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"`
|
||||||
AfterImportFriends AfterConfig `yaml:"afterImportFriends"`
|
AfterImportFriends AfterConfig `yaml:"afterImportFriends"`
|
||||||
AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"`
|
AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"`
|
||||||
|
BeforeCreateSingleChatConversations BeforeConfig `yaml:"beforeCreateSingleChatConversations"`
|
||||||
|
AfterCreateSingleChatConversations AfterConfig `yaml:"afterCreateSingleChatConversations"`
|
||||||
|
BeforeCreateGroupChatConversations BeforeConfig `yaml:"beforeCreateGroupChatConversations"`
|
||||||
|
AfterCreateGroupChatConversations AfterConfig `yaml:"afterCreateGroupChatConversations"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ZooKeeper struct {
|
type ZooKeeper struct {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user