feat: add msgDBSave webhook when data save to DB.

This commit is contained in:
mo3et 2025-09-26 12:02:28 +08:00
parent bf0289075b
commit fce3c9d9ba
8 changed files with 66 additions and 66 deletions

View File

@ -41,6 +41,9 @@ afterSendGroupMsg:
attentionIds: [] attentionIds: []
# See beforeSendSingleMsg comment. # See beforeSendSingleMsg comment.
deniedTypes: [] deniedTypes: []
afterMsgSaveDB:
enable: false
timeout: 5
afterUserOnline: afterUserOnline:
enable: false enable: false
timeout: 5 timeout: 5

View File

@ -51,7 +51,7 @@ func GetContent(msg *sdkws.MsgData) string {
} }
} }
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing { if msg.ContentType == constant.Typing {
return return
} }
@ -60,28 +60,17 @@ func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx conte
return return
} }
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ cbReq := &cbapi.CallbackAfterMsgSaveDBReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand),
RecvID: msg.RecvID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
} }
if !filterAfterMsg(msg, after) { if msg.RecvID != "" {
return cbReq.RecvID = msg.RecvID
} else if msg.GroupID != "" {
cbReq.GroupID = msg.GroupID
} }
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg))
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
GroupID: msg.GroupID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
} }
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {

View File

@ -15,7 +15,6 @@
package msgtransfer package msgtransfer
import ( import (
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/mq"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil { if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
prommetrics.MsgInsertMongoFailedCounter.Inc() prommetrics.MsgInsertMongoFailedCounter.Inc()
} else { } else {
prommetrics.MsgInsertMongoSuccessCounter.Inc() prommetrics.MsgInsertMongoSuccessCounter.Inc()
@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
} }
for _, msgData := range msgFromMQ.MsgData { for _, msgData := range msgFromMQ.MsgData {
switch msgData.SessionType { mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData)
case constant.SingleChatType:
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
case constant.ReadGroupChatType:
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
}
} }
//var seqs []int64 //var seqs []int64

View File

@ -16,8 +16,10 @@ package msg
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
@ -28,6 +30,7 @@ import (
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf
} }
// Move to msgtransfer // Move to msgtransfer
// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
// if msg.MsgData.ContentType == constant.Typing { if msg.MsgData.ContentType == constant.Typing {
// return return
// } }
// if !filterAfterMsg(msg, after) { if !filterAfterMsg(msg, after) {
// return return
// } }
// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
// RecvID: msg.MsgData.RecvID, RecvID: msg.MsgData.RecvID,
// } }
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
// } }
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
@ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi
}) })
} }
// Move to msgtransfer func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { if msg.MsgData.ContentType == constant.Typing {
// if msg.MsgData.ContentType == constant.Typing { return
// return }
// } if !filterAfterMsg(msg, after) {
// if !filterAfterMsg(msg, after) { return
// return }
// } cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID,
// GroupID: msg.MsgData.GroupID, }
// }
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
// } }
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
@ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
} }
// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
// keyMsgData := apistruct.KeyMsgData{ keyMsgData := apistruct.KeyMsgData{
// SendID: msg.SendID, SendID: msg.SendID,
// RecvID: msg.RecvID, RecvID: msg.RecvID,
// GroupID: msg.GroupID, GroupID: msg.GroupID,
// } }
// return map[string]string{ return map[string]string{
// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
// } }
// } }

View File

@ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
go m.setConversationAtInfo(ctx, req.MsgData) go m.setConversationAtInfo(ctx, req.MsgData)
} }
// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
prommetrics.GroupChatMsgProcessSuccessCounter.Inc() prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
resp = &pbmsg.SendMsgResp{} resp = &pbmsg.SendMsgResp{}
@ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
return nil, err return nil, err
} }
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
prommetrics.SingleChatMsgProcessSuccessCounter.Inc() prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
return &pbmsg.SendMsgResp{ return &pbmsg.SendMsgResp{
ServerMsgID: req.MsgData.ServerMsgID, ServerMsgID: req.MsgData.ServerMsgID,

View File

@ -66,4 +66,5 @@ const (
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand" CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand" CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand" CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
CallbackAfterMsgSaveDBCommand = "callbackAfterMsgSaveDBCommand"
) )

View File

@ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct {
type CallbackSingleMsgReadResp struct { type CallbackSingleMsgReadResp struct {
CommonCallbackResp CommonCallbackResp
} }
type CallbackAfterMsgSaveDBReq struct {
CommonCallbackReq
RecvID string `json:"recvID"`
GroupID string `json:"groupID"`
}
type CallbackAfterMsgSaveDBResp struct {
CommonCallbackResp
}

View File

@ -436,6 +436,7 @@ type Webhooks struct {
BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"`
BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"`
AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"`
AfterMsgSaveDB AfterConfig `yaml:"afterMsgSaveDB"`
AfterUserOnline AfterConfig `yaml:"afterUserOnline"` AfterUserOnline AfterConfig `yaml:"afterUserOnline"`
AfterUserOffline AfterConfig `yaml:"afterUserOffline"` AfterUserOffline AfterConfig `yaml:"afterUserOffline"`
AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"`