From 883ea4dcb9880e4345c45706c31503d16c05b830 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Wed, 5 Mar 2025 17:04:57 +0800 Subject: [PATCH] feat: sending messages supports returning fields modified by webhook (#3192) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted * feat: optimize the default notification.yml * fix: shouldPushOffline * fix: the sorting is wrong after canceling the administrator in group settings * feat: Sending messages supports returning fields modified by webhook * feat: Sending messages supports returning fields modified by webhook * feat: Sending messages supports returning fields modified by webhook --- go.mod | 4 +- go.sum | 12 +-- internal/api/msg.go | 161 ++++++++++++++++++++++++++++++++- internal/rpc/msg/callback.go | 21 ++++- internal/rpc/msg/send.go | 57 ++++++++---- pkg/apistruct/manage.go | 12 +++ pkg/apistruct/msg_test.go | 1 - test/webhook/msgmodify/main.go | 65 +++++++++++++ 8 files changed, 297 insertions(+), 36 deletions(-) delete mode 100644 pkg/apistruct/msg_test.go create mode 100644 test/webhook/msgmodify/main.go diff --git a/go.mod b/go.mod index 9a66d7cea..0a9de4010 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.72 + github.com/openimsdk/protocol v0.0.72-alpha.79 + github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6bd4ecde0..2a86d97ea 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= -github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY= -github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= +github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= +github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/msg.go b/internal/api/msg.go index 0b73af0cb..e06d5a7a4 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -15,9 +15,15 @@ package api import ( + "encoding/base64" + "encoding/json" + "sync" + "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" + "google.golang.org/protobuf/reflect/protoreflect" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -36,6 +42,39 @@ import ( "github.com/openimsdk/tools/utils/timeutil" ) +var ( + msgDataDescriptor []protoreflect.FieldDescriptor + msgDataDescriptorOnce sync.Once +) + +func getMsgDataDescriptor() []protoreflect.FieldDescriptor { + msgDataDescriptorOnce.Do(func() { + skip := make(map[string]struct{}) + respFields := new(msg.SendMsgResp).ProtoReflect().Descriptor().Fields() + for i := 0; i < respFields.Len(); i++ { + field := respFields.Get(i) + if !field.HasJSONName() { + continue + } + skip[field.JSONName()] = struct{}{} + } + fields := new(sdkws.MsgData).ProtoReflect().Descriptor().Fields() + num := fields.Len() + msgDataDescriptor = make([]protoreflect.FieldDescriptor, 0, num) + for i := 0; i < num; i++ { + field := fields.Get(i) + if !field.HasJSONName() { + continue + } + if _, ok := skip[field.JSONName()]; ok { + continue + } + msgDataDescriptor = append(msgDataDescriptor, fields.Get(i)) + } + }) + return msgDataDescriptor +} + type MessageApi struct { Client msg.MsgClient userClient *rpcli.UserClient @@ -190,6 +229,42 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM return m.newUserSendMsgReq(c, &req), nil } +func (m *MessageApi) getModifyFields(req, respModify *sdkws.MsgData) map[string]any { + if req == nil || respModify == nil { + return nil + } + fields := make(map[string]any) + reqProtoReflect := req.ProtoReflect() + respProtoReflect := respModify.ProtoReflect() + for _, descriptor := range getMsgDataDescriptor() { + reqValue := reqProtoReflect.Get(descriptor) + respValue := respProtoReflect.Get(descriptor) + if !reqValue.Equal(respValue) { + val := respValue.Interface() + name := descriptor.JSONName() + if name == "content" { + if bs, ok := val.([]byte); ok { + val = string(bs) + } + } + fields[name] = val + } + } + if len(fields) == 0 { + fields = nil + } + return fields +} + +func (m *MessageApi) ginRespSendMsg(c *gin.Context, req *msg.SendMsgReq, resp *msg.SendMsgResp) { + res := m.getModifyFields(req.MsgData, resp.Modify) + resp.Modify = nil + apiresp.GinSuccess(c, &apistruct.SendMsgResp{ + SendMsgResp: resp, + Modify: res, + }) +} + // SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework. func (m *MessageApi) SendMessage(c *gin.Context) { // Initialize a request struct for sending a message. @@ -243,7 +318,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) { } // Respond with a success message and the response payload. - apiresp.GinSuccess(c, respPb) + m.ginRespSendMsg(c, sendMsgReq, respPb) } func (m *MessageApi) SendBusinessNotification(c *gin.Context) { @@ -309,7 +384,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { apiresp.GinError(c, err) return } - apiresp.GinSuccess(c, respPb) + m.ginRespSendMsg(c, &sendMsgReq, respPb) } func (m *MessageApi) BatchSendMsg(c *gin.Context) { @@ -363,11 +438,93 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { ClientMsgID: rpcResp.ClientMsgID, SendTime: rpcResp.SendTime, RecvID: recvID, + Modify: m.getModifyFields(sendMsgReq.MsgData, rpcResp.Modify), }) } apiresp.GinSuccess(c, resp) } +func (m *MessageApi) SendSimpleMessage(c *gin.Context) { + encodedKey, ok := c.GetQuery(webhook.Key) + if !ok { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap()) + return + } + + decodedData, err := base64.StdEncoding.DecodeString(encodedKey) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var ( + req apistruct.SendSingleMsgReq + keyMsgData apistruct.KeyMsgData + + sendID string + sessionType int32 + recvID string + ) + err = json.Unmarshal(decodedData, &keyMsgData) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + if keyMsgData.GroupID != "" { + sessionType = constant.ReadGroupChatType + sendID = req.SendID + } else { + sessionType = constant.SingleChatType + sendID = keyMsgData.RecvID + recvID = keyMsgData.SendID + } + // check param + if keyMsgData.SendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap()) + return + } + if sendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap()) + return + } + + msgData := &sdkws.MsgData{ + SendID: sendID, + RecvID: recvID, + GroupID: keyMsgData.GroupID, + ClientMsgID: idutil.GetMsgIDByMD5(sendID), + SenderPlatformID: constant.AdminPlatformID, + SessionType: sessionType, + MsgFrom: constant.UserMsgType, + ContentType: constant.Text, + Content: []byte(req.Content), + OfflinePushInfo: req.OfflinePushInfo, + Ex: req.Ex, + } + + sendReq := &msg.SendMsgReq{ + MsgData: msgData, + } + + respPb, err := m.Client.SendMsg(c, sendReq) + if err != nil { + apiresp.GinError(c, err) + return + } + + var status = constant.MsgSendSuccessed + + _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ + Status: int32(status), + }) + + if err != nil { + apiresp.GinError(c, err) + return + } + + m.ginRespSendMsg(c, sendReq, respPb) +} + func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client) } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index c66dd6ca9..5bbe64f3d 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,7 +16,13 @@ package msg import ( "context" + "encoding/base64" + "encoding/json" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/stringutil" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -131,11 +137,11 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config. m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after) } -func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { +func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { - if msg.MsgData.ContentType != constant.Text { - return nil - } + //if msg.MsgData.ContentType != constant.Text { + // return nil + //} if !filterBeforeMsg(msg, before) { return nil } @@ -146,9 +152,14 @@ func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.B if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } - + if beforeMsgData != nil { + *beforeMsgData = proto.Clone(msg.MsgData).(*sdkws.MsgData) + } if resp.Content != nil { msg.MsgData.Content = []byte(*resp.Content) + if err := json.Unmarshal(msg.MsgData.Content, &struct{}{}); err != nil { + return errs.ErrArgs.WrapMsg("webhook msg modify content is not json", "content", string(msg.MsgData.Content)) + } } datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo) datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index c93f18148..0731dd8d4 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -29,26 +29,44 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" + "google.golang.org/protobuf/proto" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { - if req.MsgData != nil { - m.encapsulateMsgData(req.MsgData) - switch req.MsgData.SessionType { - case constant.SingleChatType: - return m.sendMsgSingleChat(ctx, req) - case constant.NotificationChatType: - return m.sendMsgNotification(ctx, req) - case constant.ReadGroupChatType: - return m.sendMsgGroupChat(ctx, req) - default: - return nil, errs.ErrArgs.WrapMsg("unknown sessionType") - } + if req.MsgData == nil { + return nil, errs.ErrArgs.WrapMsg("msgData is nil") } - return nil, errs.ErrArgs.WrapMsg("msgData is nil") + before := new(*sdkws.MsgData) + resp, err := m.sendMsg(ctx, req, before) + if err != nil { + return nil, err + } + if *before != nil && proto.Equal(*before, req.MsgData) == false { + resp.Modify = req.MsgData + } + return resp, nil } -func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { +func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { + m.encapsulateMsgData(req.MsgData) + if req.MsgData.ContentType == constant.Stream { + if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { + return nil, err + } + } + switch req.MsgData.SessionType { + case constant.SingleChatType: + return m.sendMsgSingleChat(ctx, req, before) + case constant.NotificationChatType: + return m.sendMsgNotification(ctx, req, before) + case constant.ReadGroupChatType: + return m.sendMsgGroupChat(ctx, req, before) + default: + return nil, errs.ErrArgs.WrapMsg("unknown sessionType") + } +} + +func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { if err = m.messageVerification(ctx, req); err != nil { prommetrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err @@ -57,7 +75,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil { return nil, err } - if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { + if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil { return nil, err } err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) @@ -139,7 +157,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa } } -func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { +func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { return nil, err } @@ -151,7 +169,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR return resp, nil } -func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { +func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { if err := m.messageVerification(ctx, req); err != nil { return nil, err } @@ -171,12 +189,11 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq } if !isSend { prommetrics.SingleChatMsgProcessFailedCounter.Inc() - return nil, nil + return nil, errs.ErrArgs.WrapMsg("message is not sent") } else { - if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { + if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil { return nil, err } - if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index f4deb9fb1..d3c1b25e2 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -15,6 +15,7 @@ package apistruct import ( + pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" ) @@ -124,4 +125,15 @@ type SingleReturnResult struct { // RecvID uniquely identifies the receiver of the message. RecvID string `json:"recvID"` + + // Modify fields modified via webhook. + Modify map[string]any `json:"modify,omitempty"` +} + +type SendMsgResp struct { + // SendMsgResp original response. + *pbmsg.SendMsgResp + + // Modify fields modified via webhook. + Modify map[string]any `json:"modify,omitempty"` } diff --git a/pkg/apistruct/msg_test.go b/pkg/apistruct/msg_test.go deleted file mode 100644 index 28f878a9f..000000000 --- a/pkg/apistruct/msg_test.go +++ /dev/null @@ -1 +0,0 @@ -package apistruct diff --git a/test/webhook/msgmodify/main.go b/test/webhook/msgmodify/main.go new file mode 100644 index 000000000..3bdfa0ec5 --- /dev/null +++ b/test/webhook/msgmodify/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/gin-gonic/gin" + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/protocol/constant" +) + +func main() { + g := gin.Default() + g.POST("/callbackExample/callbackBeforeMsgModifyCommand", toGin(handlerMsg)) + if err := g.Run(":10006"); err != nil { + panic(err) + } +} + +func toGin[R any](fn func(c *gin.Context, req *R)) gin.HandlerFunc { + return func(c *gin.Context) { + body, err := io.ReadAll(c.Request.Body) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body) + var req R + if err := json.Unmarshal(body, &req); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + fn(c, &req) + } +} + +func handlerMsg(c *gin.Context, req *cbapi.CallbackMsgModifyCommandReq) { + var resp cbapi.CallbackMsgModifyCommandResp + if req.ContentType != constant.Text { + c.JSON(http.StatusOK, &resp) + return + } + var textElem struct { + Content string `json:"content"` + } + if err := json.Unmarshal([]byte(req.Content), &textElem); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + const word = "xxx" + if strings.Contains(textElem.Content, word) { + textElem.Content = strings.ReplaceAll(textElem.Content, word, strings.Repeat("*", len(word))) + content, err := json.Marshal(&textElem) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + tmp := string(content) + resp.Content = &tmp + } + c.JSON(http.StatusOK, &resp) +}