mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	fix: solve incorrect when sendMsg webhook callback after.
This commit is contained in:
		
							parent
							
								
									b7ca3bd95f
								
							
						
					
					
						commit
						80a4bf114b
					
				
							
								
								
									
										127
									
								
								internal/msgtransfer/callback.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										127
									
								
								internal/msgtransfer/callback.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,127 @@ | |||||||
|  | package msgtransfer | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/base64" | ||||||
|  | 	"fmt" | ||||||
|  | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | 	"github.com/openimsdk/protocol/sdkws" | ||||||
|  | 	"github.com/openimsdk/tools/mcontext" | ||||||
|  | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
|  | 	"github.com/openimsdk/tools/utils/stringutil" | ||||||
|  | 	"google.golang.org/protobuf/proto" | ||||||
|  | 
 | ||||||
|  | 	cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq { | ||||||
|  | 	return cbapi.CommonCallbackReq{ | ||||||
|  | 		SendID:           msg.SendID, | ||||||
|  | 		ServerMsgID:      msg.ServerMsgID, | ||||||
|  | 		CallbackCommand:  command, | ||||||
|  | 		ClientMsgID:      msg.ClientMsgID, | ||||||
|  | 		OperationID:      mcontext.GetOperationID(ctx), | ||||||
|  | 		SenderPlatformID: msg.SenderPlatformID, | ||||||
|  | 		SenderNickname:   msg.SenderNickname, | ||||||
|  | 		SessionType:      msg.SessionType, | ||||||
|  | 		MsgFrom:          msg.MsgFrom, | ||||||
|  | 		ContentType:      msg.ContentType, | ||||||
|  | 		Status:           msg.Status, | ||||||
|  | 		SendTime:         msg.SendTime, | ||||||
|  | 		CreateTime:       msg.CreateTime, | ||||||
|  | 		AtUserIDList:     msg.AtUserIDList, | ||||||
|  | 		SenderFaceURL:    msg.SenderFaceURL, | ||||||
|  | 		Content:          GetContent(msg), | ||||||
|  | 		Seq:              uint32(msg.Seq), | ||||||
|  | 		Ex:               msg.Ex, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func GetContent(msg *sdkws.MsgData) string { | ||||||
|  | 	if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { | ||||||
|  | 		var tips sdkws.TipsComm | ||||||
|  | 		_ = proto.Unmarshal(msg.Content, &tips) | ||||||
|  | 		content := tips.JsonDetail | ||||||
|  | 		return content | ||||||
|  | 	} else { | ||||||
|  | 		return string(msg.Content) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { | ||||||
|  | 	if msg.ContentType == constant.Typing { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	if !filterAfterMsg(msg, after) { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ | ||||||
|  | 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), | ||||||
|  | 		RecvID:            msg.RecvID, | ||||||
|  | 	} | ||||||
|  | 	mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { | ||||||
|  | 	if msg.ContentType == constant.Typing { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	if !filterAfterMsg(msg, after) { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	fmt.Println("webhookAfterSendGroupMsg true excute", msg.Seq) | ||||||
|  | 
 | ||||||
|  | 	cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ | ||||||
|  | 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), | ||||||
|  | 		GroupID:           msg.GroupID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { | ||||||
|  | 	keyMsgData := apistruct.KeyMsgData{ | ||||||
|  | 		SendID:  msg.SendID, | ||||||
|  | 		RecvID:  msg.RecvID, | ||||||
|  | 		GroupID: msg.GroupID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return map[string]string{ | ||||||
|  | 		webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool { | ||||||
|  | 	return filterMsg(msg, after.AttentionIds, after.DeniedTypes) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool { | ||||||
|  | 	// According to the attentionIds configuration, only some users are sent | ||||||
|  | 	if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if defaultDeniedTypes(msg.ContentType) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func defaultDeniedTypes(contentType int32) bool { | ||||||
|  | 	if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd { | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	if contentType == constant.Typing { | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	return false | ||||||
|  | } | ||||||
| @ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase) | 	historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) | ||||||
| 
 | 
 | ||||||
| 	msgTransfer := &MsgTransfer{ | 	msgTransfer := &MsgTransfer{ | ||||||
| 		historyConsumer:      historyConsumer, | 		historyConsumer:      historyConsumer, | ||||||
|  | |||||||
| @ -19,6 +19,8 @@ import ( | |||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	pbmsg "github.com/openimsdk/protocol/msg" | 	pbmsg "github.com/openimsdk/protocol/msg" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"google.golang.org/protobuf/proto" | 	"google.golang.org/protobuf/proto" | ||||||
| @ -26,11 +28,15 @@ import ( | |||||||
| 
 | 
 | ||||||
| type OnlineHistoryMongoConsumerHandler struct { | type OnlineHistoryMongoConsumerHandler struct { | ||||||
| 	msgTransferDatabase controller.MsgTransferDatabase | 	msgTransferDatabase controller.MsgTransferDatabase | ||||||
|  | 	config              *Config | ||||||
|  | 	webhookClient       *webhook.Client | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler { | func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler { | ||||||
| 	return &OnlineHistoryMongoConsumerHandler{ | 	return &OnlineHistoryMongoConsumerHandler{ | ||||||
| 		msgTransferDatabase: database, | 		msgTransferDatabase: database, | ||||||
|  | 		config:              config, | ||||||
|  | 		webhookClient:       webhook.NewWebhookClient(config.WebhooksConfig.URL), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont | |||||||
| 	} else { | 	} else { | ||||||
| 		prommetrics.MsgInsertMongoSuccessCounter.Inc() | 		prommetrics.MsgInsertMongoSuccessCounter.Inc() | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, msgData := range msgFromMQ.MsgData { | ||||||
|  | 		switch msgData.SessionType { | ||||||
|  | 		case constant.SingleChatType: | ||||||
|  | 			mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) | ||||||
|  | 		case constant.ReadGroupChatType: | ||||||
|  | 			mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	//var seqs []int64 | 	//var seqs []int64 | ||||||
| 	//for _, msg := range msgFromMQ.MsgData { | 	//for _, msg := range msgFromMQ.MsgData { | ||||||
| 	//	seqs = append(seqs, msg.Seq) | 	//	seqs = append(seqs, msg.Seq) | ||||||
|  | |||||||
| @ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, | |||||||
| 		go m.setConversationAtInfo(ctx, req.MsgData) | 		go m.setConversationAtInfo(ctx, req.MsgData) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) | 	// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) | ||||||
|  | 
 | ||||||
| 	prommetrics.GroupChatMsgProcessSuccessCounter.Inc() | 	prommetrics.GroupChatMsgProcessSuccessCounter.Inc() | ||||||
| 	resp = &pbmsg.SendMsgResp{} | 	resp = &pbmsg.SendMsgResp{} | ||||||
| 	resp.SendTime = req.MsgData.SendTime | 	resp.SendTime = req.MsgData.SendTime | ||||||
| @ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq | |||||||
| 			prommetrics.SingleChatMsgProcessFailedCounter.Inc() | 			prommetrics.SingleChatMsgProcessFailedCounter.Inc() | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 		m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) | 
 | ||||||
|  | 		// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) | ||||||
| 		prommetrics.SingleChatMsgProcessSuccessCounter.Inc() | 		prommetrics.SingleChatMsgProcessSuccessCounter.Inc() | ||||||
| 		return &pbmsg.SendMsgResp{ | 		return &pbmsg.SendMsgResp{ | ||||||
| 			ServerMsgID: req.MsgData.ServerMsgID, | 			ServerMsgID: req.MsgData.ServerMsgID, | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user