mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 12:42:12 +08:00 
			
		
		
		
	Merge 706e374777c54f67d6d7b36d72d8aacc87bbe79e into 11358404f9e6bfb15bc62445cedeefbad9bd342c
This commit is contained in:
		
						commit
						b34805e6bd
					
				| @ -41,6 +41,9 @@ afterSendGroupMsg: | |||||||
|   attentionIds: [] |   attentionIds: [] | ||||||
|   # See beforeSendSingleMsg comment. |   # See beforeSendSingleMsg comment. | ||||||
|   deniedTypes: [] |   deniedTypes: [] | ||||||
|  | afterMsgSaveDB: | ||||||
|  |   enable: false | ||||||
|  |   timeout: 5 | ||||||
| afterUserOnline: | afterUserOnline: | ||||||
|   enable: false |   enable: false | ||||||
|   timeout: 5 |   timeout: 5 | ||||||
|  | |||||||
| @ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { | func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { | ||||||
| 	if msg.ContentType == constant.Typing { |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if !filterAfterMsg(msg, after) { | 	if !filterAfterMsg(msg, after) { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ | 	cbReq := &cbapi.CallbackAfterMsgSaveDBReq{ | ||||||
| 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), | 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand), | ||||||
| 		RecvID:            msg.RecvID, |  | ||||||
| 	} |  | ||||||
| 	mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { |  | ||||||
| 	if msg.ContentType == constant.Typing { |  | ||||||
| 		return |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !filterAfterMsg(msg, after) { | 	switch msg.SessionType { | ||||||
| 		return | 	case constant.SingleChatType, constant.NotificationChatType: | ||||||
|  | 		cbReq.RecvID = msg.RecvID | ||||||
|  | 	case constant.ReadGroupChatType: | ||||||
|  | 		cbReq.GroupID = msg.GroupID | ||||||
|  | 	default: | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ | 	mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg)) | ||||||
| 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), |  | ||||||
| 		GroupID:           msg.GroupID, |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { | func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { | ||||||
|  | |||||||
| @ -15,7 +15,6 @@ | |||||||
| package msgtransfer | package msgtransfer | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"github.com/openimsdk/protocol/constant" |  | ||||||
| 	"github.com/openimsdk/tools/mq" | 	"github.com/openimsdk/tools/mq" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| @ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) | |||||||
| 	log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) | 	log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) | ||||||
| 	err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) | 	err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) | 		log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) | ||||||
| 		prommetrics.MsgInsertMongoFailedCounter.Inc() | 		prommetrics.MsgInsertMongoFailedCounter.Inc() | ||||||
| 	} else { | 	} else { | ||||||
| 		prommetrics.MsgInsertMongoSuccessCounter.Inc() | 		prommetrics.MsgInsertMongoSuccessCounter.Inc() | ||||||
| @ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, msgData := range msgFromMQ.MsgData { | 	for _, msgData := range msgFromMQ.MsgData { | ||||||
| 		switch msgData.SessionType { | 		mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData) | ||||||
| 		case constant.SingleChatType: |  | ||||||
| 			mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) |  | ||||||
| 		case constant.ReadGroupChatType: |  | ||||||
| 			mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	//var seqs []int64 | 	//var seqs []int64 | ||||||
|  | |||||||
| @ -16,8 +16,10 @@ package msg | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/base64" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 
 | 
 | ||||||
| @ -28,6 +30,7 @@ import ( | |||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"github.com/openimsdk/tools/mcontext" | 	"github.com/openimsdk/tools/mcontext" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
|  | 	"github.com/openimsdk/tools/utils/stringutil" | ||||||
| 	"google.golang.org/protobuf/proto" | 	"google.golang.org/protobuf/proto" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Move to msgtransfer | // Move to msgtransfer | ||||||
| // func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { | func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { | ||||||
| // 	if msg.MsgData.ContentType == constant.Typing { | 	if msg.MsgData.ContentType == constant.Typing { | ||||||
| // 		return | 		return | ||||||
| // 	} | 	} | ||||||
| // 	if !filterAfterMsg(msg, after) { | 	if !filterAfterMsg(msg, after) { | ||||||
| // 		return | 		return | ||||||
| // 	} | 	} | ||||||
| // 	cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ | 	cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ | ||||||
| // 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), | 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), | ||||||
| // 		RecvID:            msg.MsgData.RecvID, | 		RecvID:            msg.MsgData.RecvID, | ||||||
| // 	} | 	} | ||||||
| // 	m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) | 	m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) | ||||||
| // } | } | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { | func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { | ||||||
| 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | ||||||
| @ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Move to msgtransfer | func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { | ||||||
| // func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { | 	if msg.MsgData.ContentType == constant.Typing { | ||||||
| // 	if msg.MsgData.ContentType == constant.Typing { | 		return | ||||||
| // 		return | 	} | ||||||
| // 	} | 	if !filterAfterMsg(msg, after) { | ||||||
| // 	if !filterAfterMsg(msg, after) { | 		return | ||||||
| // 		return | 	} | ||||||
| // 	} | 	cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ | ||||||
| // 	cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ | 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), | ||||||
| // 		CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), | 		GroupID:           msg.MsgData.GroupID, | ||||||
| // 		GroupID:           msg.MsgData.GroupID, | 	} | ||||||
| // 	} |  | ||||||
| 
 | 
 | ||||||
| // 	m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) | 	m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) | ||||||
| // } | } | ||||||
| 
 | 
 | ||||||
| func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { | func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { | ||||||
| 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | ||||||
| @ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft | |||||||
| 	m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) | 	m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { | func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { | ||||||
| // 	keyMsgData := apistruct.KeyMsgData{ | 	keyMsgData := apistruct.KeyMsgData{ | ||||||
| // 		SendID:  msg.SendID, | 		SendID:  msg.SendID, | ||||||
| // 		RecvID:  msg.RecvID, | 		RecvID:  msg.RecvID, | ||||||
| // 		GroupID: msg.GroupID, | 		GroupID: msg.GroupID, | ||||||
| // 	} | 	} | ||||||
| 
 | 
 | ||||||
| // 	return map[string]string{ | 	return map[string]string{ | ||||||
| // 		webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), | 		webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), | ||||||
| // 	} | 	} | ||||||
| // } | } | ||||||
|  | |||||||
| @ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, | |||||||
| 		go m.setConversationAtInfo(ctx, req.MsgData) | 		go m.setConversationAtInfo(ctx, req.MsgData) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) | 	m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) | ||||||
| 
 | 
 | ||||||
| 	prommetrics.GroupChatMsgProcessSuccessCounter.Inc() | 	prommetrics.GroupChatMsgProcessSuccessCounter.Inc() | ||||||
| 	resp = &pbmsg.SendMsgResp{} | 	resp = &pbmsg.SendMsgResp{} | ||||||
| @ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq | |||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) | 		m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) | ||||||
| 		prommetrics.SingleChatMsgProcessSuccessCounter.Inc() | 		prommetrics.SingleChatMsgProcessSuccessCounter.Inc() | ||||||
| 		return &pbmsg.SendMsgResp{ | 		return &pbmsg.SendMsgResp{ | ||||||
| 			ServerMsgID: req.MsgData.ServerMsgID, | 			ServerMsgID: req.MsgData.ServerMsgID, | ||||||
|  | |||||||
| @ -66,4 +66,5 @@ const ( | |||||||
| 	CallbackAfterCreateSingleChatConversationsCommand  = "callbackAfterCreateSingleChatConversationsCommand" | 	CallbackAfterCreateSingleChatConversationsCommand  = "callbackAfterCreateSingleChatConversationsCommand" | ||||||
| 	CallbackBeforeCreateGroupChatConversationsCommand  = "callbackBeforeCreateGroupChatConversationsCommand" | 	CallbackBeforeCreateGroupChatConversationsCommand  = "callbackBeforeCreateGroupChatConversationsCommand" | ||||||
| 	CallbackAfterCreateGroupChatConversationsCommand   = "callbackAfterCreateGroupChatConversationsCommand" | 	CallbackAfterCreateGroupChatConversationsCommand   = "callbackAfterCreateGroupChatConversationsCommand" | ||||||
|  | 	CallbackAfterMsgSaveDBCommand                      = "callbackAfterMsgSaveDBCommand" | ||||||
| ) | ) | ||||||
|  | |||||||
| @ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct { | |||||||
| type CallbackSingleMsgReadResp struct { | type CallbackSingleMsgReadResp struct { | ||||||
| 	CommonCallbackResp | 	CommonCallbackResp | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | type CallbackAfterMsgSaveDBReq struct { | ||||||
|  | 	CommonCallbackReq | ||||||
|  | 	RecvID  string `json:"recvID"` | ||||||
|  | 	GroupID string `json:"groupID"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type CallbackAfterMsgSaveDBResp struct { | ||||||
|  | 	CommonCallbackResp | ||||||
|  | } | ||||||
|  | |||||||
| @ -436,6 +436,7 @@ type Webhooks struct { | |||||||
| 	BeforeSendGroupMsg                  BeforeConfig `yaml:"beforeSendGroupMsg"` | 	BeforeSendGroupMsg                  BeforeConfig `yaml:"beforeSendGroupMsg"` | ||||||
| 	BeforeMsgModify                     BeforeConfig `yaml:"beforeMsgModify"` | 	BeforeMsgModify                     BeforeConfig `yaml:"beforeMsgModify"` | ||||||
| 	AfterSendGroupMsg                   AfterConfig  `yaml:"afterSendGroupMsg"` | 	AfterSendGroupMsg                   AfterConfig  `yaml:"afterSendGroupMsg"` | ||||||
|  | 	AfterMsgSaveDB                      AfterConfig  `yaml:"afterMsgSaveDB"` | ||||||
| 	AfterUserOnline                     AfterConfig  `yaml:"afterUserOnline"` | 	AfterUserOnline                     AfterConfig  `yaml:"afterUserOnline"` | ||||||
| 	AfterUserOffline                    AfterConfig  `yaml:"afterUserOffline"` | 	AfterUserOffline                    AfterConfig  `yaml:"afterUserOffline"` | ||||||
| 	AfterUserKickOff                    AfterConfig  `yaml:"afterUserKickOff"` | 	AfterUserKickOff                    AfterConfig  `yaml:"afterUserKickOff"` | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user