feat: sending messages supports returning fields modified by webhook (#3192)

* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash

* fix: fill send time

* fix: fill send time

* fix: crash caused by withdrawing messages from users who have left the group

* fix: user msg timestamp

* seq read config

* seq read config

* fix: the source message of the reference is withdrawn, and the referenced message is deleted

* feat: optimize the default notification.yml

* fix: shouldPushOffline

* fix: the sorting is wrong after canceling the administrator in group settings

* feat: Sending messages supports returning fields modified by webhook

* feat: Sending messages supports returning fields modified by webhook

* feat: Sending messages supports returning fields modified by webhook
This commit is contained in:
chao 2025-03-05 17:04:57 +08:00 committed by OpenIM-Robot
parent 8342f97349
commit 883ea4dcb9
8 changed files with 297 additions and 36 deletions

4
go.mod
View File

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.71
github.com/openimsdk/tools v0.0.50-alpha.72
github.com/openimsdk/protocol v0.0.72-alpha.79
github.com/openimsdk/tools v0.0.50-alpha.74
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0

12
go.sum
View File

@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70=
github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY=
github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s=
github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc=
github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -15,9 +15,15 @@
package api
import (
"encoding/base64"
"encoding/json"
"sync"
"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
"google.golang.org/protobuf/reflect/protoreflect"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -36,6 +42,39 @@ import (
"github.com/openimsdk/tools/utils/timeutil"
)
var (
msgDataDescriptor []protoreflect.FieldDescriptor
msgDataDescriptorOnce sync.Once
)
func getMsgDataDescriptor() []protoreflect.FieldDescriptor {
msgDataDescriptorOnce.Do(func() {
skip := make(map[string]struct{})
respFields := new(msg.SendMsgResp).ProtoReflect().Descriptor().Fields()
for i := 0; i < respFields.Len(); i++ {
field := respFields.Get(i)
if !field.HasJSONName() {
continue
}
skip[field.JSONName()] = struct{}{}
}
fields := new(sdkws.MsgData).ProtoReflect().Descriptor().Fields()
num := fields.Len()
msgDataDescriptor = make([]protoreflect.FieldDescriptor, 0, num)
for i := 0; i < num; i++ {
field := fields.Get(i)
if !field.HasJSONName() {
continue
}
if _, ok := skip[field.JSONName()]; ok {
continue
}
msgDataDescriptor = append(msgDataDescriptor, fields.Get(i))
}
})
return msgDataDescriptor
}
type MessageApi struct {
Client msg.MsgClient
userClient *rpcli.UserClient
@ -190,6 +229,42 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
return m.newUserSendMsgReq(c, &req), nil
}
func (m *MessageApi) getModifyFields(req, respModify *sdkws.MsgData) map[string]any {
if req == nil || respModify == nil {
return nil
}
fields := make(map[string]any)
reqProtoReflect := req.ProtoReflect()
respProtoReflect := respModify.ProtoReflect()
for _, descriptor := range getMsgDataDescriptor() {
reqValue := reqProtoReflect.Get(descriptor)
respValue := respProtoReflect.Get(descriptor)
if !reqValue.Equal(respValue) {
val := respValue.Interface()
name := descriptor.JSONName()
if name == "content" {
if bs, ok := val.([]byte); ok {
val = string(bs)
}
}
fields[name] = val
}
}
if len(fields) == 0 {
fields = nil
}
return fields
}
func (m *MessageApi) ginRespSendMsg(c *gin.Context, req *msg.SendMsgReq, resp *msg.SendMsgResp) {
res := m.getModifyFields(req.MsgData, resp.Modify)
resp.Modify = nil
apiresp.GinSuccess(c, &apistruct.SendMsgResp{
SendMsgResp: resp,
Modify: res,
})
}
// SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework.
func (m *MessageApi) SendMessage(c *gin.Context) {
// Initialize a request struct for sending a message.
@ -243,7 +318,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
}
// Respond with a success message and the response payload.
apiresp.GinSuccess(c, respPb)
m.ginRespSendMsg(c, sendMsgReq, respPb)
}
func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
@ -309,7 +384,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
apiresp.GinError(c, err)
return
}
apiresp.GinSuccess(c, respPb)
m.ginRespSendMsg(c, &sendMsgReq, respPb)
}
func (m *MessageApi) BatchSendMsg(c *gin.Context) {
@ -363,11 +438,93 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
ClientMsgID: rpcResp.ClientMsgID,
SendTime: rpcResp.SendTime,
RecvID: recvID,
Modify: m.getModifyFields(sendMsgReq.MsgData, rpcResp.Modify),
})
}
apiresp.GinSuccess(c, resp)
}
func (m *MessageApi) SendSimpleMessage(c *gin.Context) {
encodedKey, ok := c.GetQuery(webhook.Key)
if !ok {
apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap())
return
}
decodedData, err := base64.StdEncoding.DecodeString(encodedKey)
if err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
var (
req apistruct.SendSingleMsgReq
keyMsgData apistruct.KeyMsgData
sendID string
sessionType int32
recvID string
)
err = json.Unmarshal(decodedData, &keyMsgData)
if err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
if keyMsgData.GroupID != "" {
sessionType = constant.ReadGroupChatType
sendID = req.SendID
} else {
sessionType = constant.SingleChatType
sendID = keyMsgData.RecvID
recvID = keyMsgData.SendID
}
// check param
if keyMsgData.SendID == "" {
apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap())
return
}
if sendID == "" {
apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap())
return
}
msgData := &sdkws.MsgData{
SendID: sendID,
RecvID: recvID,
GroupID: keyMsgData.GroupID,
ClientMsgID: idutil.GetMsgIDByMD5(sendID),
SenderPlatformID: constant.AdminPlatformID,
SessionType: sessionType,
MsgFrom: constant.UserMsgType,
ContentType: constant.Text,
Content: []byte(req.Content),
OfflinePushInfo: req.OfflinePushInfo,
Ex: req.Ex,
}
sendReq := &msg.SendMsgReq{
MsgData: msgData,
}
respPb, err := m.Client.SendMsg(c, sendReq)
if err != nil {
apiresp.GinError(c, err)
return
}
var status = constant.MsgSendSuccessed
_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{
Status: int32(status),
})
if err != nil {
apiresp.GinError(c, err)
return
}
m.ginRespSendMsg(c, sendReq, respPb)
}
func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) {
a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client)
}

View File

@ -16,7 +16,13 @@ package msg
import (
"context"
"encoding/base64"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -131,11 +137,11 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.
m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after)
}
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
if msg.MsgData.ContentType != constant.Text {
return nil
}
//if msg.MsgData.ContentType != constant.Text {
// return nil
//}
if !filterBeforeMsg(msg, before) {
return nil
}
@ -146,9 +152,14 @@ func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.B
if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if beforeMsgData != nil {
*beforeMsgData = proto.Clone(msg.MsgData).(*sdkws.MsgData)
}
if resp.Content != nil {
msg.MsgData.Content = []byte(*resp.Content)
if err := json.Unmarshal(msg.MsgData.Content, &struct{}{}); err != nil {
return errs.ErrArgs.WrapMsg("webhook msg modify content is not json", "content", string(msg.MsgData.Content))
}
}
datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo)
datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID)

View File

@ -29,26 +29,44 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/protobuf/proto"
)
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
if req.MsgData != nil {
m.encapsulateMsgData(req.MsgData)
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req)
case constant.ReadGroupChatType:
return m.sendMsgGroupChat(ctx, req)
default:
return nil, errs.ErrArgs.WrapMsg("unknown sessionType")
}
if req.MsgData == nil {
return nil, errs.ErrArgs.WrapMsg("msgData is nil")
}
return nil, errs.ErrArgs.WrapMsg("msgData is nil")
before := new(*sdkws.MsgData)
resp, err := m.sendMsg(ctx, req, before)
if err != nil {
return nil, err
}
if *before != nil && proto.Equal(*before, req.MsgData) == false {
resp.Modify = req.MsgData
}
return resp, nil
}
func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) {
m.encapsulateMsgData(req.MsgData)
if req.MsgData.ContentType == constant.Stream {
if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil {
return nil, err
}
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req, before)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req, before)
case constant.ReadGroupChatType:
return m.sendMsgGroupChat(ctx, req, before)
default:
return nil, errs.ErrArgs.WrapMsg("unknown sessionType")
}
}
func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) {
if err = m.messageVerification(ctx, req); err != nil {
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
return nil, err
@ -57,7 +75,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq)
if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil {
return nil, err
}
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil {
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil {
return nil, err
}
err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
@ -139,7 +157,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
}
}
func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) {
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
return nil, err
}
@ -151,7 +169,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR
return resp, nil
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) {
if err := m.messageVerification(ctx, req); err != nil {
return nil, err
}
@ -171,12 +189,11 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
}
if !isSend {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, nil
return nil, errs.ErrArgs.WrapMsg("message is not sent")
} else {
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil {
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil {
return nil, err
}
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err

View File

@ -15,6 +15,7 @@
package apistruct
import (
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
)
@ -124,4 +125,15 @@ type SingleReturnResult struct {
// RecvID uniquely identifies the receiver of the message.
RecvID string `json:"recvID"`
// Modify fields modified via webhook.
Modify map[string]any `json:"modify,omitempty"`
}
type SendMsgResp struct {
// SendMsgResp original response.
*pbmsg.SendMsgResp
// Modify fields modified via webhook.
Modify map[string]any `json:"modify,omitempty"`
}

View File

@ -1 +0,0 @@
package apistruct

View File

@ -0,0 +1,65 @@
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/gin-gonic/gin"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/protocol/constant"
)
func main() {
g := gin.Default()
g.POST("/callbackExample/callbackBeforeMsgModifyCommand", toGin(handlerMsg))
if err := g.Run(":10006"); err != nil {
panic(err)
}
}
func toGin[R any](fn func(c *gin.Context, req *R)) gin.HandlerFunc {
return func(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body)
var req R
if err := json.Unmarshal(body, &req); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
fn(c, &req)
}
}
func handlerMsg(c *gin.Context, req *cbapi.CallbackMsgModifyCommandReq) {
var resp cbapi.CallbackMsgModifyCommandResp
if req.ContentType != constant.Text {
c.JSON(http.StatusOK, &resp)
return
}
var textElem struct {
Content string `json:"content"`
}
if err := json.Unmarshal([]byte(req.Content), &textElem); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
const word = "xxx"
if strings.Contains(textElem.Content, word) {
textElem.Content = strings.ReplaceAll(textElem.Content, word, strings.Repeat("*", len(word)))
content, err := json.Marshal(&textElem)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
tmp := string(content)
resp.Content = &tmp
}
c.JSON(http.StatusOK, &resp)
}