diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go new file mode 100644 index 000000000..ea51c2839 --- /dev/null +++ b/internal/msgtransfer/callback.go @@ -0,0 +1,124 @@ +package msgtransfer + +import ( + "context" + "encoding/base64" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" + "google.golang.org/protobuf/proto" + + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" +) + +func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq { + return cbapi.CommonCallbackReq{ + SendID: msg.SendID, + ServerMsgID: msg.ServerMsgID, + CallbackCommand: command, + ClientMsgID: msg.ClientMsgID, + OperationID: mcontext.GetOperationID(ctx), + SenderPlatformID: msg.SenderPlatformID, + SenderNickname: msg.SenderNickname, + SessionType: msg.SessionType, + MsgFrom: msg.MsgFrom, + ContentType: msg.ContentType, + Status: msg.Status, + SendTime: msg.SendTime, + CreateTime: msg.CreateTime, + AtUserIDList: msg.AtUserIDList, + SenderFaceURL: msg.SenderFaceURL, + Content: GetContent(msg), + Seq: uint32(msg.Seq), + Ex: msg.Ex, + } +} + +func GetContent(msg *sdkws.MsgData) string { + if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { + var tips sdkws.TipsComm + _ = proto.Unmarshal(msg.Content, &tips) + content := tips.JsonDetail + return content + } else { + return string(msg.Content) + } +} + +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { + if msg.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), + RecvID: msg.RecvID, + } + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) +} + +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { + if msg.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), + GroupID: msg.GroupID, + } + + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) +} + +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} + +func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool { + return filterMsg(msg, after.AttentionIds, after.DeniedTypes) +} + +func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool { + // According to the attentionIds configuration, only some users are sent + if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) { + return false + } + + if defaultDeniedTypes(msg.ContentType) { + return false + } + + if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) { + return false + } + + return true +} + +func defaultDeniedTypes(contentType int32) bool { + if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd { + return true + } + if contentType == constant.Typing { + return true + } + return false +} diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 175813552..b07ec6f1d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase) + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) msgTransfer := &MsgTransfer{ historyConsumer: historyConsumer, diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a895bb9c4..6c1498f82 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -19,6 +19,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" @@ -26,11 +28,15 @@ import ( type OnlineHistoryMongoConsumerHandler struct { msgTransferDatabase controller.MsgTransferDatabase + config *Config + webhookClient *webhook.Client } -func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler { +func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler { return &OnlineHistoryMongoConsumerHandler{ msgTransferDatabase: database, + config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } } @@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() } + + for _, msgData := range msgFromMQ.MsgData { + switch msgData.SessionType { + case constant.SingleChatType: + mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) + case constant.ReadGroupChatType: + mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) + } + } + //var seqs []int64 //for _, msg := range msgFromMQ.MsgData { // seqs = append(seqs, msg.Seq) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f13b80708..d97905bff 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, go m.setConversationAtInfo(ctx, req.MsgData) } - m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime @@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } - m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) + + // m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID,