diff --git a/config/webhooks.yml b/config/webhooks.yml index 9fd3eb339..c1645e7c8 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -41,6 +41,9 @@ afterSendGroupMsg: attentionIds: [] # See beforeSendSingleMsg comment. deniedTypes: [] +afterMsgSaveDB: + enable: false + timeout: 5 afterUserOnline: enable: false timeout: 5 diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go index f0d439779..575efb4ae 100644 --- a/internal/msgtransfer/callback.go +++ b/internal/msgtransfer/callback.go @@ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string { } } -func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { - if msg.ContentType == constant.Typing { - return - } - +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { if !filterAfterMsg(msg, after) { return } - cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), - RecvID: msg.RecvID, - } - mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) -} - -func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { - if msg.ContentType == constant.Typing { - return + cbReq := &cbapi.CallbackAfterMsgSaveDBReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand), } - if !filterAfterMsg(msg, after) { - return + switch msg.SessionType { + case constant.SingleChatType, constant.NotificationChatType: + cbReq.RecvID = msg.RecvID + case constant.ReadGroupChatType: + cbReq.GroupID = msg.GroupID + default: } - cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), - GroupID: msg.GroupID, - } - - mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg)) } func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 8611af7ea..147bd37b0 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -15,7 +15,6 @@ package msgtransfer import ( - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/mq" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) if err != nil { - log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) + log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) prommetrics.MsgInsertMongoFailedCounter.Inc() } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() @@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) } for _, msgData := range msgFromMQ.MsgData { - switch msgData.SessionType { - case constant.SingleChatType: - mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) - case constant.ReadGroupChatType: - mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) - } + mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData) } //var seqs []int64 diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 2c00efd43..49fb04477 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,8 +16,10 @@ package msg import ( "context" + "encoding/base64" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/tools/errs" @@ -28,6 +30,7 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" ) @@ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf } // Move to msgtransfer -// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { -// if msg.MsgData.ContentType == constant.Typing { -// return -// } -// if !filterAfterMsg(msg, after) { -// return -// } -// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ -// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), -// RecvID: msg.MsgData.RecvID, -// } -// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -// } +func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), + RecvID: msg.MsgData.RecvID, + } + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +} func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi }) } -// Move to msgtransfer -// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { -// if msg.MsgData.ContentType == constant.Typing { -// return -// } -// if !filterAfterMsg(msg, after) { -// return -// } -// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ -// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), -// GroupID: msg.MsgData.GroupID, -// } +func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), + GroupID: msg.MsgData.GroupID, + } -// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -// } + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +} func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } -// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { -// keyMsgData := apistruct.KeyMsgData{ -// SendID: msg.SendID, -// RecvID: msg.RecvID, -// GroupID: msg.GroupID, -// } +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } -// return map[string]string{ -// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), -// } -// } + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index d97905bff..18ad9cc56 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, go m.setConversationAtInfo(ctx, req.MsgData) } - // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} @@ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq return nil, err } - // m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) + m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index bbc34e71f..ef7cb502f 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -66,4 +66,5 @@ const ( CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand" CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand" CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand" + CallbackAfterMsgSaveDBCommand = "callbackAfterMsgSaveDBCommand" ) diff --git a/pkg/callbackstruct/message.go b/pkg/callbackstruct/message.go index 902fa6110..ef8a8bb28 100644 --- a/pkg/callbackstruct/message.go +++ b/pkg/callbackstruct/message.go @@ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct { type CallbackSingleMsgReadResp struct { CommonCallbackResp } + +type CallbackAfterMsgSaveDBReq struct { + CommonCallbackReq + RecvID string `json:"recvID"` + GroupID string `json:"groupID"` +} + +type CallbackAfterMsgSaveDBResp struct { + CommonCallbackResp +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index c5000a6e5..856cbf3ec 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -436,6 +436,7 @@ type Webhooks struct { BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` + AfterMsgSaveDB AfterConfig `yaml:"afterMsgSaveDB"` AfterUserOnline AfterConfig `yaml:"afterUserOnline"` AfterUserOffline AfterConfig `yaml:"afterUserOffline"` AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"`