mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 13:12:12 +08:00 
			
		
		
		
	feat: Sending messages supports returning fields modified by webhook
This commit is contained in:
		
							parent
							
								
									13356b6d49
								
							
						
					
					
						commit
						86bdcbcde6
					
				
							
								
								
									
										4
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | |||||||
| 	github.com/gorilla/websocket v1.5.1 | 	github.com/gorilla/websocket v1.5.1 | ||||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||||
| 	github.com/mitchellh/mapstructure v1.5.0 | 	github.com/mitchellh/mapstructure v1.5.0 | ||||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.78 | 	github.com/openimsdk/protocol v0.0.72-alpha.79 | ||||||
| 	github.com/openimsdk/tools v0.0.50-alpha.74 | 	github.com/openimsdk/tools v0.0.50-alpha.74 | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.18.0 | 	github.com/prometheus/client_golang v1.18.0 | ||||||
| @ -219,3 +219,5 @@ require ( | |||||||
| 	golang.org/x/crypto v0.27.0 // indirect | 	golang.org/x/crypto v0.27.0 // indirect | ||||||
| 	gopkg.in/ini.v1 v1.67.0 // indirect | 	gopkg.in/ini.v1 v1.67.0 // indirect | ||||||
| ) | ) | ||||||
|  | 
 | ||||||
|  | //replace github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | |||||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||||
| github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= | github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= | ||||||
| github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.78 h1:n9HVj5olMPiGLF3Z4apPvvYzn2yOpyrsn2/YiAaIsxw= | github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.78/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= | github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||||
|  | |||||||
| @ -17,10 +17,12 @@ package api | |||||||
| import ( | import ( | ||||||
| 	"encoding/base64" | 	"encoding/base64" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
|  | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
| 	"github.com/go-playground/validator/v10" | 	"github.com/go-playground/validator/v10" | ||||||
| 	"github.com/mitchellh/mapstructure" | 	"github.com/mitchellh/mapstructure" | ||||||
|  | 	"google.golang.org/protobuf/reflect/protoreflect" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||||
| @ -41,6 +43,39 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/utils/timeutil" | 	"github.com/openimsdk/tools/utils/timeutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | var ( | ||||||
|  | 	msgDataDescriptor     []protoreflect.FieldDescriptor | ||||||
|  | 	msgDataDescriptorOnce sync.Once | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func getMsgDataDescriptor() []protoreflect.FieldDescriptor { | ||||||
|  | 	msgDataDescriptorOnce.Do(func() { | ||||||
|  | 		skip := make(map[string]struct{}) | ||||||
|  | 		respFields := new(msg.SendMsgResp).ProtoReflect().Descriptor().Fields() | ||||||
|  | 		for i := 0; i < respFields.Len(); i++ { | ||||||
|  | 			field := respFields.Get(i) | ||||||
|  | 			if !field.HasJSONName() { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			skip[field.JSONName()] = struct{}{} | ||||||
|  | 		} | ||||||
|  | 		fields := new(sdkws.MsgData).ProtoReflect().Descriptor().Fields() | ||||||
|  | 		num := fields.Len() | ||||||
|  | 		msgDataDescriptor = make([]protoreflect.FieldDescriptor, 0, num) | ||||||
|  | 		for i := 0; i < num; i++ { | ||||||
|  | 			field := fields.Get(i) | ||||||
|  | 			if !field.HasJSONName() { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			if _, ok := skip[field.JSONName()]; ok { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			msgDataDescriptor = append(msgDataDescriptor, fields.Get(i)) | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	return msgDataDescriptor | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type MessageApi struct { | type MessageApi struct { | ||||||
| 	Client        msg.MsgClient | 	Client        msg.MsgClient | ||||||
| 	userClient    *rpcli.UserClient | 	userClient    *rpcli.UserClient | ||||||
| @ -197,6 +232,42 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM | |||||||
| 	return m.newUserSendMsgReq(c, &req), nil | 	return m.newUserSendMsgReq(c, &req), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (m *MessageApi) getModifyFields(req, respModify *sdkws.MsgData) map[string]any { | ||||||
|  | 	if req == nil || respModify == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	fields := make(map[string]any) | ||||||
|  | 	reqProtoReflect := req.ProtoReflect() | ||||||
|  | 	respProtoReflect := respModify.ProtoReflect() | ||||||
|  | 	for _, descriptor := range getMsgDataDescriptor() { | ||||||
|  | 		reqValue := reqProtoReflect.Get(descriptor) | ||||||
|  | 		respValue := respProtoReflect.Get(descriptor) | ||||||
|  | 		if !reqValue.Equal(respValue) { | ||||||
|  | 			val := respValue.Interface() | ||||||
|  | 			name := descriptor.JSONName() | ||||||
|  | 			if name == "content" { | ||||||
|  | 				if bs, ok := val.([]byte); ok { | ||||||
|  | 					val = string(bs) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			fields[name] = val | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(fields) == 0 { | ||||||
|  | 		fields = nil | ||||||
|  | 	} | ||||||
|  | 	return fields | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *MessageApi) ginRespSendMsg(c *gin.Context, req *msg.SendMsgReq, resp *msg.SendMsgResp) { | ||||||
|  | 	res := m.getModifyFields(req.MsgData, resp.Modify) | ||||||
|  | 	resp.Modify = nil | ||||||
|  | 	apiresp.GinSuccess(c, &apistruct.SendMsgResp{ | ||||||
|  | 		SendMsgResp: resp, | ||||||
|  | 		Modify:      res, | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework. | // SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework. | ||||||
| func (m *MessageApi) SendMessage(c *gin.Context) { | func (m *MessageApi) SendMessage(c *gin.Context) { | ||||||
| 	// Initialize a request struct for sending a message. | 	// Initialize a request struct for sending a message. | ||||||
| @ -250,7 +321,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Respond with a success message and the response payload. | 	// Respond with a success message and the response payload. | ||||||
| 	apiresp.GinSuccess(c, respPb) | 	m.ginRespSendMsg(c, sendMsgReq, respPb) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *MessageApi) SendBusinessNotification(c *gin.Context) { | func (m *MessageApi) SendBusinessNotification(c *gin.Context) { | ||||||
| @ -316,7 +387,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { | |||||||
| 		apiresp.GinError(c, err) | 		apiresp.GinError(c, err) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	apiresp.GinSuccess(c, respPb) | 	m.ginRespSendMsg(c, &sendMsgReq, respPb) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *MessageApi) BatchSendMsg(c *gin.Context) { | func (m *MessageApi) BatchSendMsg(c *gin.Context) { | ||||||
| @ -370,6 +441,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { | |||||||
| 			ClientMsgID: rpcResp.ClientMsgID, | 			ClientMsgID: rpcResp.ClientMsgID, | ||||||
| 			SendTime:    rpcResp.SendTime, | 			SendTime:    rpcResp.SendTime, | ||||||
| 			RecvID:      recvID, | 			RecvID:      recvID, | ||||||
|  | 			Modify:      m.getModifyFields(sendMsgReq.MsgData, rpcResp.Modify), | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| 	apiresp.GinSuccess(c, resp) | 	apiresp.GinSuccess(c, resp) | ||||||
| @ -432,7 +504,11 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) { | |||||||
| 		Ex:               req.Ex, | 		Ex:               req.Ex, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	respPb, err := m.Client.SendMsg(c, &msg.SendMsgReq{MsgData: msgData}) | 	sendReq := &msg.SendMsgReq{ | ||||||
|  | 		MsgData: msgData, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	respPb, err := m.Client.SendMsg(c, sendReq) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		apiresp.GinError(c, err) | 		apiresp.GinError(c, err) | ||||||
| 		return | 		return | ||||||
| @ -449,7 +525,7 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	apiresp.GinSuccess(c, respPb) | 	m.ginRespSendMsg(c, sendReq, respPb) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { | func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { | ||||||
|  | |||||||
| @ -17,9 +17,11 @@ package msg | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"encoding/base64" | 	"encoding/base64" | ||||||
|  | 	"encoding/json" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | ||||||
|  | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"github.com/openimsdk/tools/utils/stringutil" | 	"github.com/openimsdk/tools/utils/stringutil" | ||||||
| 
 | 
 | ||||||
| 	cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" | 	cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" | ||||||
| @ -136,11 +138,11 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config. | |||||||
| 	m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) | 	m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { | func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { | ||||||
| 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | ||||||
| 		if msg.MsgData.ContentType != constant.Text { | 		//if msg.MsgData.ContentType != constant.Text { | ||||||
| 			return nil | 		//	return nil | ||||||
| 		} | 		//} | ||||||
| 		if !filterBeforeMsg(msg, before) { | 		if !filterBeforeMsg(msg, before) { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| @ -151,9 +153,14 @@ func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.B | |||||||
| 		if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { | 		if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 
 | 		if beforeMsgData != nil { | ||||||
|  | 			*beforeMsgData = proto.Clone(msg.MsgData).(*sdkws.MsgData) | ||||||
|  | 		} | ||||||
| 		if resp.Content != nil { | 		if resp.Content != nil { | ||||||
| 			msg.MsgData.Content = []byte(*resp.Content) | 			msg.MsgData.Content = []byte(*resp.Content) | ||||||
|  | 			if err := json.Unmarshal(msg.MsgData.Content, &struct{}{}); err != nil { | ||||||
|  | 				return errs.ErrArgs.WrapMsg("webhook msg modify content is not json", "content", string(msg.MsgData.Content)) | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 		datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo) | 		datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo) | ||||||
| 		datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID) | 		datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID) | ||||||
|  | |||||||
| @ -29,31 +29,44 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mcontext" | 	"github.com/openimsdk/tools/mcontext" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
|  | 	"google.golang.org/protobuf/proto" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { | func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { | ||||||
| 	if req.MsgData != nil { | 	if req.MsgData == nil { | ||||||
| 		m.encapsulateMsgData(req.MsgData) | 		return nil, errs.ErrArgs.WrapMsg("msgData is nil") | ||||||
| 		if req.MsgData.ContentType == constant.Stream { |  | ||||||
| 			if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		switch req.MsgData.SessionType { |  | ||||||
| 		case constant.SingleChatType: |  | ||||||
| 			return m.sendMsgSingleChat(ctx, req) |  | ||||||
| 		case constant.NotificationChatType: |  | ||||||
| 			return m.sendMsgNotification(ctx, req) |  | ||||||
| 		case constant.ReadGroupChatType: |  | ||||||
| 			return m.sendMsgGroupChat(ctx, req) |  | ||||||
| 		default: |  | ||||||
| 			return nil, errs.ErrArgs.WrapMsg("unknown sessionType") |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| 	return nil, errs.ErrArgs.WrapMsg("msgData is nil") | 	before := new(*sdkws.MsgData) | ||||||
|  | 	resp, err := m.sendMsg(ctx, req, before) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if *before != nil && proto.Equal(*before, req.MsgData) == false { | ||||||
|  | 		resp.Modify = req.MsgData | ||||||
|  | 	} | ||||||
|  | 	return resp, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { | func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { | ||||||
|  | 	m.encapsulateMsgData(req.MsgData) | ||||||
|  | 	if req.MsgData.ContentType == constant.Stream { | ||||||
|  | 		if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	switch req.MsgData.SessionType { | ||||||
|  | 	case constant.SingleChatType: | ||||||
|  | 		return m.sendMsgSingleChat(ctx, req, before) | ||||||
|  | 	case constant.NotificationChatType: | ||||||
|  | 		return m.sendMsgNotification(ctx, req, before) | ||||||
|  | 	case constant.ReadGroupChatType: | ||||||
|  | 		return m.sendMsgGroupChat(ctx, req, before) | ||||||
|  | 	default: | ||||||
|  | 		return nil, errs.ErrArgs.WrapMsg("unknown sessionType") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { | ||||||
| 	if err = m.messageVerification(ctx, req); err != nil { | 	if err = m.messageVerification(ctx, req); err != nil { | ||||||
| 		prommetrics.GroupChatMsgProcessFailedCounter.Inc() | 		prommetrics.GroupChatMsgProcessFailedCounter.Inc() | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -62,7 +75,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) | |||||||
| 	if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil { | 	if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { | 	if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) | 	err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) | ||||||
| @ -144,7 +157,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { | func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { | ||||||
| 	if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { | 	if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -156,7 +169,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR | |||||||
| 	return resp, nil | 	return resp, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { | func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { | ||||||
| 	if err := m.messageVerification(ctx, req); err != nil { | 	if err := m.messageVerification(ctx, req); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -176,12 +189,11 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq | |||||||
| 	} | 	} | ||||||
| 	if !isSend { | 	if !isSend { | ||||||
| 		prommetrics.SingleChatMsgProcessFailedCounter.Inc() | 		prommetrics.SingleChatMsgProcessFailedCounter.Inc() | ||||||
| 		return nil, nil | 		return nil, errs.ErrArgs.WrapMsg("message is not sent") | ||||||
| 	} else { | 	} else { | ||||||
| 		if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { | 		if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 
 |  | ||||||
| 		if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { | 		if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { | ||||||
| 			prommetrics.SingleChatMsgProcessFailedCounter.Inc() | 			prommetrics.SingleChatMsgProcessFailedCounter.Inc() | ||||||
| 			return nil, err | 			return nil, err | ||||||
|  | |||||||
| @ -15,6 +15,7 @@ | |||||||
| package apistruct | package apistruct | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	pbmsg "github.com/openimsdk/protocol/msg" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -139,4 +140,15 @@ type SingleReturnResult struct { | |||||||
| 
 | 
 | ||||||
| 	// RecvID uniquely identifies the receiver of the message. | 	// RecvID uniquely identifies the receiver of the message. | ||||||
| 	RecvID string `json:"recvID"` | 	RecvID string `json:"recvID"` | ||||||
|  | 
 | ||||||
|  | 	// Modify fields modified via webhook. | ||||||
|  | 	Modify map[string]any `json:"modify,omitempty"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type SendMsgResp struct { | ||||||
|  | 	// SendMsgResp original response. | ||||||
|  | 	*pbmsg.SendMsgResp | ||||||
|  | 
 | ||||||
|  | 	// Modify fields modified via webhook. | ||||||
|  | 	Modify map[string]any `json:"modify,omitempty"` | ||||||
| } | } | ||||||
|  | |||||||
| @ -1 +0,0 @@ | |||||||
| package apistruct |  | ||||||
							
								
								
									
										65
									
								
								tools/msgmodify/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								tools/msgmodify/main.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,65 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	"github.com/gin-gonic/gin" | ||||||
|  | 	cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func main() { | ||||||
|  | 	g := gin.Default() | ||||||
|  | 	g.POST("/callbackExample/callbackBeforeMsgModifyCommand", toGin(handlerMsg)) | ||||||
|  | 	if err := g.Run(":10006"); err != nil { | ||||||
|  | 		panic(err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func toGin[R any](fn func(c *gin.Context, req *R)) gin.HandlerFunc { | ||||||
|  | 	return func(c *gin.Context) { | ||||||
|  | 		body, err := io.ReadAll(c.Request.Body) | ||||||
|  | 		if err != nil { | ||||||
|  | 			c.String(http.StatusInternalServerError, err.Error()) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body) | ||||||
|  | 		var req R | ||||||
|  | 		if err := json.Unmarshal(body, &req); err != nil { | ||||||
|  | 			c.String(http.StatusInternalServerError, err.Error()) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		fn(c, &req) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func handlerMsg(c *gin.Context, req *cbapi.CallbackMsgModifyCommandReq) { | ||||||
|  | 	var resp cbapi.CallbackMsgModifyCommandResp | ||||||
|  | 	if req.ContentType != constant.Text { | ||||||
|  | 		c.JSON(http.StatusOK, &resp) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	var textElem struct { | ||||||
|  | 		Content string `json:"content"` | ||||||
|  | 	} | ||||||
|  | 	if err := json.Unmarshal([]byte(req.Content), &textElem); err != nil { | ||||||
|  | 		c.String(http.StatusInternalServerError, err.Error()) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	const word = "xxx" | ||||||
|  | 	if strings.Contains(textElem.Content, word) { | ||||||
|  | 		textElem.Content = strings.ReplaceAll(textElem.Content, word, strings.Repeat("*", len(word))) | ||||||
|  | 		content, err := json.Marshal(&textElem) | ||||||
|  | 		if err != nil { | ||||||
|  | 			c.String(http.StatusInternalServerError, err.Error()) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		tmp := string(content) | ||||||
|  | 		resp.Content = &tmp | ||||||
|  | 	} | ||||||
|  | 	c.JSON(http.StatusOK, &resp) | ||||||
|  | } | ||||||
| @ -1,161 +0,0 @@ | |||||||
| package main |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"bytes" |  | ||||||
| 	"context" |  | ||||||
| 	"encoding/json" |  | ||||||
| 	"fmt" |  | ||||||
| 	"github.com/gin-gonic/gin" |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" |  | ||||||
| 	cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" |  | ||||||
| 	"github.com/openimsdk/protocol/auth" |  | ||||||
| 	"github.com/openimsdk/protocol/constant" |  | ||||||
| 	"github.com/openimsdk/protocol/msg" |  | ||||||
| 	"github.com/openimsdk/tools/apiresp" |  | ||||||
| 	"github.com/openimsdk/tools/errs" |  | ||||||
| 	"io" |  | ||||||
| 	"net/http" |  | ||||||
| 	"strings" |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	getAdminToken   = "/auth/get_admin_token" |  | ||||||
| 	sendMsgApi      = "/msg/send_msg" |  | ||||||
| 	appendStreamMsg = "/msg/append_stream_msg" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| var ( |  | ||||||
| 	ApiAddr = "http://127.0.0.1:10002" |  | ||||||
| 	Token   string |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func ApiCall[R any](api string, req any) (*R, error) { |  | ||||||
| 	data, err := json.Marshal(req) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) |  | ||||||
| 	defer cancel() |  | ||||||
| 	request, err := http.NewRequestWithContext(ctx, http.MethodPost, ApiAddr+api, bytes.NewBuffer(data)) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	if Token != "" { |  | ||||||
| 		request.Header.Set("token", Token) |  | ||||||
| 	} |  | ||||||
| 	request.Header.Set(constant.OperationID, uuid.New().String()) |  | ||||||
| 	response, err := http.DefaultClient.Do(request) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	defer response.Body.Close() |  | ||||||
| 	var resp R |  | ||||||
| 	apiResponse := apiresp.ApiResponse{ |  | ||||||
| 		Data: &resp, |  | ||||||
| 	} |  | ||||||
| 	if err := json.NewDecoder(response.Body).Decode(&apiResponse); err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	if apiResponse.ErrCode != 0 { |  | ||||||
| 		return nil, errs.NewCodeError(apiResponse.ErrCode, apiResponse.ErrMsg) |  | ||||||
| 	} |  | ||||||
| 	return &resp, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func main() { |  | ||||||
| 	resp, err := ApiCall[auth.GetAdminTokenResp](getAdminToken, &auth.GetAdminTokenReq{ |  | ||||||
| 		Secret: "openIM123", |  | ||||||
| 		UserID: "imAdmin", |  | ||||||
| 	}) |  | ||||||
| 	if err != nil { |  | ||||||
| 		fmt.Println("get admin token failed", err) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	Token = resp.Token |  | ||||||
| 	g := gin.Default() |  | ||||||
| 	g.POST("/callbackExample/callbackAfterSendSingleMsgCommand", toGin(handlerUserMsg)) |  | ||||||
| 	if err := g.Run(":10006"); err != nil { |  | ||||||
| 		panic(err) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func toGin[R any](fn func(c *gin.Context, req *R) error) gin.HandlerFunc { |  | ||||||
| 	return func(c *gin.Context) { |  | ||||||
| 		body, err := io.ReadAll(c.Request.Body) |  | ||||||
| 		if err != nil { |  | ||||||
| 			c.String(http.StatusInternalServerError, err.Error()) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body) |  | ||||||
| 		var req R |  | ||||||
| 		if err := json.Unmarshal(body, &req); err != nil { |  | ||||||
| 			c.String(http.StatusInternalServerError, err.Error()) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		if err := fn(c, &req); err != nil { |  | ||||||
| 			c.String(http.StatusInternalServerError, err.Error()) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		c.String(http.StatusOK, "{}") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func handlerUserMsg(c *gin.Context, req *cbapi.CallbackAfterSendSingleMsgReq) error { |  | ||||||
| 	if req.ContentType != constant.Text { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 	if !strings.Contains(req.Content, "stream") { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 	apiReq := apistruct.SendMsgReq{ |  | ||||||
| 		RecvID: req.SendID, |  | ||||||
| 		SendMsg: apistruct.SendMsg{ |  | ||||||
| 			SendID:           req.RecvID, |  | ||||||
| 			SenderNickname:   "xxx", |  | ||||||
| 			SenderFaceURL:    "", |  | ||||||
| 			SenderPlatformID: constant.AdminPlatformID, |  | ||||||
| 			ContentType:      constant.Stream, |  | ||||||
| 			SessionType:      req.SessionType, |  | ||||||
| 			SendTime:         time.Now().UnixMilli(), |  | ||||||
| 			Content: map[string]any{ |  | ||||||
| 				"type":    "xxx", |  | ||||||
| 				"content": "server test stream msg", |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	go func() { |  | ||||||
| 		if err := doPushStreamMsg(&apiReq); err != nil { |  | ||||||
| 			fmt.Println("doPushStreamMsg failed", err) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		fmt.Println("doPushStreamMsg success") |  | ||||||
| 	}() |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func doPushStreamMsg(sendReq *apistruct.SendMsgReq) error { |  | ||||||
| 	resp, err := ApiCall[msg.SendMsgResp](sendMsgApi, sendReq) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	const num = 5 |  | ||||||
| 	for i := 1; i <= num; i++ { |  | ||||||
| 		_, err := ApiCall[msg.AppendStreamMsgResp](appendStreamMsg, &msg.AppendStreamMsgReq{ |  | ||||||
| 			ClientMsgID: resp.ClientMsgID, |  | ||||||
| 			StartIndex:  int64(i - 1), |  | ||||||
| 			Packets: []string{ |  | ||||||
| 				fmt.Sprintf("stream_msg_packet_%03d", i), |  | ||||||
| 			}, |  | ||||||
| 			End: i == num, |  | ||||||
| 		}) |  | ||||||
| 		if err != nil { |  | ||||||
| 			fmt.Println("append stream msg failed", "clientMsgID", resp.ClientMsgID, "index", fmt.Sprintf("%d/%d", i, num), "error", err) |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 		fmt.Println("append stream msg success", "clientMsgID", resp.ClientMsgID, "index", fmt.Sprintf("%d/%d", i, num)) |  | ||||||
| 		time.Sleep(time.Second * 10) |  | ||||||
| 	} |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user