mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-18 17:09:17 +08:00
fix: solve incorrect when sendMsg webhook callback after. (#3409)
* fix: solve incorrect when sendMsg webhook callback after. * remove print.
This commit is contained in:
parent
b7ca3bd95f
commit
04ee509b68
124
internal/msgtransfer/callback.go
Normal file
124
internal/msgtransfer/callback.go
Normal file
@ -0,0 +1,124 @@
|
||||
package msgtransfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
)
|
||||
|
||||
func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq {
|
||||
return cbapi.CommonCallbackReq{
|
||||
SendID: msg.SendID,
|
||||
ServerMsgID: msg.ServerMsgID,
|
||||
CallbackCommand: command,
|
||||
ClientMsgID: msg.ClientMsgID,
|
||||
OperationID: mcontext.GetOperationID(ctx),
|
||||
SenderPlatformID: msg.SenderPlatformID,
|
||||
SenderNickname: msg.SenderNickname,
|
||||
SessionType: msg.SessionType,
|
||||
MsgFrom: msg.MsgFrom,
|
||||
ContentType: msg.ContentType,
|
||||
Status: msg.Status,
|
||||
SendTime: msg.SendTime,
|
||||
CreateTime: msg.CreateTime,
|
||||
AtUserIDList: msg.AtUserIDList,
|
||||
SenderFaceURL: msg.SenderFaceURL,
|
||||
Content: GetContent(msg),
|
||||
Seq: uint32(msg.Seq),
|
||||
Ex: msg.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func GetContent(msg *sdkws.MsgData) string {
|
||||
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
|
||||
var tips sdkws.TipsComm
|
||||
_ = proto.Unmarshal(msg.Content, &tips)
|
||||
content := tips.JsonDetail
|
||||
return content
|
||||
} else {
|
||||
return string(msg.Content)
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||
if msg.ContentType == constant.Typing {
|
||||
return
|
||||
}
|
||||
if !filterAfterMsg(msg, after) {
|
||||
return
|
||||
}
|
||||
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||
RecvID: msg.RecvID,
|
||||
}
|
||||
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||
if msg.ContentType == constant.Typing {
|
||||
return
|
||||
}
|
||||
if !filterAfterMsg(msg, after) {
|
||||
return
|
||||
}
|
||||
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
||||
GroupID: msg.GroupID,
|
||||
}
|
||||
|
||||
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
||||
}
|
||||
|
||||
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||
keyMsgData := apistruct.KeyMsgData{
|
||||
SendID: msg.SendID,
|
||||
RecvID: msg.RecvID,
|
||||
GroupID: msg.GroupID,
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
||||
}
|
||||
}
|
||||
|
||||
func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool {
|
||||
return filterMsg(msg, after.AttentionIds, after.DeniedTypes)
|
||||
}
|
||||
|
||||
func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool {
|
||||
// According to the attentionIds configuration, only some users are sent
|
||||
if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) {
|
||||
return false
|
||||
}
|
||||
|
||||
if defaultDeniedTypes(msg.ContentType) {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func defaultDeniedTypes(contentType int32) bool {
|
||||
if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd {
|
||||
return true
|
||||
}
|
||||
if contentType == constant.Typing {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase)
|
||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config)
|
||||
|
||||
msgTransfer := &MsgTransfer{
|
||||
historyConsumer: historyConsumer,
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@ -26,11 +28,15 @@ import (
|
||||
|
||||
type OnlineHistoryMongoConsumerHandler struct {
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
config *Config
|
||||
webhookClient *webhook.Client
|
||||
}
|
||||
|
||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler {
|
||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler {
|
||||
return &OnlineHistoryMongoConsumerHandler{
|
||||
msgTransferDatabase: database,
|
||||
config: config,
|
||||
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
|
||||
} else {
|
||||
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
||||
}
|
||||
|
||||
for _, msgData := range msgFromMQ.MsgData {
|
||||
switch msgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
|
||||
case constant.ReadGroupChatType:
|
||||
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
|
||||
}
|
||||
}
|
||||
|
||||
//var seqs []int64
|
||||
//for _, msg := range msgFromMQ.MsgData {
|
||||
// seqs = append(seqs, msg.Seq)
|
||||
|
@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
|
||||
go m.setConversationAtInfo(ctx, req.MsgData)
|
||||
}
|
||||
|
||||
m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
||||
// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
||||
|
||||
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
||||
resp = &pbmsg.SendMsgResp{}
|
||||
resp.SendTime = req.MsgData.SendTime
|
||||
@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||
return nil, err
|
||||
}
|
||||
m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
||||
|
||||
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
||||
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
||||
return &pbmsg.SendMsgResp{
|
||||
ServerMsgID: req.MsgData.ServerMsgID,
|
||||
|
Loading…
x
Reference in New Issue
Block a user