From 98837edfdd4dc358692678460399efb43bdd798a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 17:35:27 +0800 Subject: [PATCH 1/8] callback --- internal/rpc/msg/callback.go | 4 ++-- pkg/common/http/http_client.go | 16 ++++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index ebea6e66c..969304ccb 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -55,7 +55,7 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) err return nil } req := &cbapi.CallbackAfterSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendSingleMsgCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackAfterSendSingleMsgResp{} @@ -67,7 +67,7 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err return nil } req := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendSingleMsgCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 430d33499..25de275c8 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -10,15 +10,17 @@ import ( "bytes" "context" "encoding/json" - "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "io" "io/ioutil" "net/http" urlLib "net/url" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" ) var client http.Client @@ -86,13 +88,15 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { - return errs.ErrCallbackContinue + log.ZWarn(ctx, "callback failed but continue", err, "url", url, "err", err.Error()) + return nil } return errs.ErrNetwork.Wrap(err.Error()) } if err = json.Unmarshal(b, output); err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { - return errs.ErrCallbackContinue + log.ZWarn(ctx, "callback failed but continue", err, "url", url, "err", err.Error()) + return nil } return errs.ErrData.Wrap(err.Error()) } From 16f215a82bf5f1f2b283eaba5e54616a2f1afe28 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 17:55:41 +0800 Subject: [PATCH 2/8] callback log --- internal/rpc/msg/callback.go | 4 ++-- pkg/common/http/http_client.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 969304ccb..6fb035b81 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -71,7 +71,7 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} - return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg) + return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg) } func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { @@ -94,7 +94,7 @@ func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackMsgModifyCommand), } resp := &cbapi.CallbackMsgModifyCommandResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil { + if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { return err } if resp.Content != nil { diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 25de275c8..9f4349bb5 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -82,6 +82,7 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input } func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { + log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "callbackConfig", callbackConfig) v := urlLib.Values{} v.Set(constant.CallbackCommand, command) url = url + "?" + v.Encode() From 08f9bc705b71031bd909a3fea19b9a0d4608b58b Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 18:40:44 +0800 Subject: [PATCH 3/8] callback --- internal/push/push_to_client.go | 5 ++--- internal/rpc/group/group.go | 10 +++++----- internal/rpc/msg/message_interceptor.go | 26 ------------------------- internal/rpc/msg/send.go | 21 ++++++++++++++------ internal/rpc/msg/server.go | 2 +- 5 files changed, 23 insertions(+), 41 deletions(-) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 31cafb0f0..abc4ade1e 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,7 +18,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" @@ -79,7 +78,7 @@ func (p *Pusher) DismissGroup(ctx context.Context, groupID string) error { func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) // callback - if err := callbackOnlinePush(ctx, userIDs, msg); err != nil && err != errs.ErrCallbackContinue { + if err := callbackOnlinePush(ctx, userIDs, msg); err != nil { return err } // push @@ -132,7 +131,7 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string - if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil && err != errs.ErrCallbackContinue { + if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { return err } if len(pushToUserIDs) == 0 { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index cf1c443e6..2fb35c0b3 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -155,7 +155,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR if len(userMap) != len(userIDs) { return nil, errs.ErrUserIDNotFound.Wrap("user not found") } - if err := CallbackBeforeCreateGroup(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err := CallbackBeforeCreateGroup(ctx, req); err != nil { return nil, err } var groupMembers []*relationTb.GroupMemberModel @@ -173,7 +173,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR groupMember.InviterUserID = mcontext.GetOpUserID(ctx) groupMember.JoinTime = time.Now() groupMember.MuteEndTime = time.Unix(0, 0) - if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue { + if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil { return err } groupMembers = append(groupMembers, groupMember) @@ -364,7 +364,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite member.JoinSource = constant.JoinByInvitation member.JoinTime = time.Now() member.MuteEndTime = time.Unix(0, 0) - if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue { + if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil { return nil, err } groupMembers = append(groupMembers, member) @@ -704,7 +704,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup OperatorUserID: mcontext.GetOpUserID(ctx), Ex: groupRequest.Ex, } - if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue { + if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil { return nil, err } } @@ -756,7 +756,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) groupMember.InviterUserID = req.InviterUserID groupMember.JoinTime = time.Now() groupMember.MuteEndTime = time.Unix(0, 0) - if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue { + if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil { return nil, err } if err := s.GroupDatabase.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil { diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go index d142abbbe..702cdc764 100644 --- a/internal/rpc/msg/message_interceptor.go +++ b/internal/rpc/msg/message_interceptor.go @@ -5,7 +5,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" @@ -30,28 +29,3 @@ func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgDa } return req.MsgData, nil } -func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { - if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { - log.ZWarn(ctx, "CallbackMsgModify failed", err, "req", req.String()) - return nil, err - } - return req.MsgData, nil -} -func MessageBeforeSendCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { - switch req.MsgData.SessionType { - case constant.SingleChatType: - if err := callbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { - log.ZWarn(ctx, "CallbackBeforeSendSingleMsg failed", err, "req", req.String()) - return nil, err - } - case constant.NotificationChatType: - case constant.SuperGroupChatType: - if err := callbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { - log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String()) - return nil, err - } - default: - return nil, errs.ErrArgs.Wrap("unknown sessionType") - } - return req.MsgData, nil -} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 12c614fea..5c5e303b1 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -18,9 +18,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbMsg.SendMsgReq) (resp *p return nil, errs.ErrMessageHasReadDisable.Wrap() } m.encapsulateMsgData(req.MsgData) - if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { - return nil, err - } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req) @@ -40,12 +37,18 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return nil, err } + if err = callbackBeforeSendGroupMsg(ctx, req); err != nil { + return nil, err + } + if err := callbackMsgModify(ctx, req); err != nil { + return nil, err + } err = m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) if err != nil { return nil, err } if err = callbackAfterSendGroupMsg(ctx, req); err != nil { - log.ZError(ctx, "CallbackAfterSendGroupMsg", err) + log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) resp.SendTime = req.MsgData.SendTime @@ -85,13 +88,19 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter) return nil, errs.ErrUserNotRecvMsg } else { + if err = callbackBeforeSendSingleMsg(ctx, req); err != nil { + return nil, err + } + if err := callbackMsgModify(ctx, req); err != nil { + return nil, err + } if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter) return nil, err } err = callbackAfterSendSingleMsg(ctx, req) - if err != nil && err != errs.ErrCallbackContinue { - return nil, err + if err != nil { + log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req) } resp = &pbMsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 85b247ea3..c2b30191e 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -81,7 +81,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e MessageLocker: NewLockerMessage(cacheModel), } s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) - s.addInterceptorHandler(MessageHasReadEnabled, MessageModifyCallback) + s.addInterceptorHandler(MessageHasReadEnabled) s.initPrometheus() msg.RegisterMsgServer(server, s) return nil From 2c0ec630fd9f67dfbaa46d676f4cfd6e3ef5b4c9 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 19:05:06 +0800 Subject: [PATCH 4/8] msg --- internal/rpc/msg/callback.go | 2 ++ internal/rpc/msg/send.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 6fb035b81..bdc8d058a 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -7,6 +7,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -116,5 +117,6 @@ func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { utils.NotNilReplace(&msg.MsgData.AtUserIDList, resp.AtUserIDList) utils.NotNilReplace(&msg.MsgData.AttachedInfo, resp.AttachedInfo) utils.NotNilReplace(&msg.MsgData.Ex, resp.Ex) + log.ZDebug(ctx, "callbackMsgModify", "msg", msg.MsgData) return nil } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 5c5e303b1..89f015315 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -31,7 +31,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbMsg.SendMsgReq) (resp *p } func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) { - resp = &pbMsg.SendMsgResp{} promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) if err = m.messageVerification(ctx, req); err != nil { promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) @@ -51,6 +50,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) + resp = &pbMsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime resp.ServerMsgID = req.MsgData.ServerMsgID resp.ClientMsgID = req.MsgData.ClientMsgID From ab99be68e839e84cf9ae28f530bf835c3faf90b5 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 19:19:27 +0800 Subject: [PATCH 5/8] callback log --- internal/rpc/msg/send.go | 1 + pkg/common/http/http_client.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 89f015315..8b2467846 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -72,6 +72,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbMsg.SendMsgR } func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) { + defer log.ZDebug(ctx, "sendMsgSingleChat return line") promePkg.Inc(promePkg.SingleChatMsgRecvSuccessCounter) if err := m.messageVerification(ctx, req); err != nil { return nil, err diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 9f4349bb5..1f64c5ef9 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -89,15 +89,15 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { - log.ZWarn(ctx, "callback failed but continue", err, "url", url, "err", err.Error()) - return nil + log.ZWarn(ctx, "callback failed but continue", err, "url", url) + return errs.ErrCallbackContinue } return errs.ErrNetwork.Wrap(err.Error()) } if err = json.Unmarshal(b, output); err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { - log.ZWarn(ctx, "callback failed but continue", err, "url", url, "err", err.Error()) - return nil + log.ZWarn(ctx, "callback failed but continue", err, "url", url) + return errs.ErrCallbackContinue } return errs.ErrData.Wrap(err.Error()) } From 2637e81fa110ee29e4ba00e731ad9e289b48d9d0 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 19:21:40 +0800 Subject: [PATCH 6/8] callback --- internal/rpc/msg/send.go | 1 - pkg/common/http/http_client.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 8b2467846..89f015315 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -72,7 +72,6 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbMsg.SendMsgR } func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) { - defer log.ZDebug(ctx, "sendMsgSingleChat return line") promePkg.Inc(promePkg.SingleChatMsgRecvSuccessCounter) if err := m.messageVerification(ctx, req); err != nil { return nil, err diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 1f64c5ef9..2227a3438 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -82,7 +82,7 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input } func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { - log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "callbackConfig", callbackConfig) + defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "callbackConfig", callbackConfig) v := urlLib.Values{} v.Set(constant.CallbackCommand, command) url = url + "?" + v.Encode() From 3c0288009843b053f49d72120e1df5900d11e7b8 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 19:30:40 +0800 Subject: [PATCH 7/8] ctx --- pkg/common/http/http_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 2227a3438..482a9d680 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -87,6 +87,7 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac v.Set(constant.CallbackCommand, command) url = url + "?" + v.Encode() b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) + log.ZError(ctx, "post", err, "b", string(b)) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { log.ZWarn(ctx, "callback failed but continue", err, "url", url) From 931991f92697a26094f8cf45ca5fac7ef6db35c1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Jun 2023 19:44:45 +0800 Subject: [PATCH 8/8] callback --- internal/push/callback.go | 11 ++++++++-- internal/rpc/friend/callback.go | 13 ++++++++---- internal/rpc/group/callback.go | 10 +++++++++ internal/rpc/msg/callback.go | 36 +++++++++++++++++++++++++++++---- pkg/common/http/http_client.go | 1 - 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/internal/push/callback.go b/internal/push/callback.go index ddd1a2cd4..db9f36c48 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -2,11 +2,13 @@ package push import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) @@ -39,8 +41,10 @@ func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDa Content: utils.GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush) - if err != nil { + if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } if len(resp.UserIDs) != 0 { @@ -100,6 +104,9 @@ func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } if len(resp.UserIDs) != 0 { diff --git a/internal/rpc/friend/callback.go b/internal/rpc/friend/callback.go index 60b978d2e..c36e743c5 100644 --- a/internal/rpc/friend/callback.go +++ b/internal/rpc/friend/callback.go @@ -2,17 +2,17 @@ package friend import ( "context" + cbapi "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" pbfriend "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend" ) func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error { - log.ZInfo(ctx, "CallbackBeforeAddFriend", "in", req) if !config.Config.Callback.CallbackBeforeAddFriend.Enable { return nil } @@ -24,6 +24,11 @@ func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriend OperationID: mcontext.GetOperationID(ctx), } resp := &cbapi.CallbackBeforeAddFriendResp{} - defer log.ZInfo(ctx, "CallbackBeforeAddFriend", "out", &resp) - return http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend) + if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } + return err + } + return nil } diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index 31756914a..aa311e4e5 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -11,6 +11,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -44,6 +45,9 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( resp := &callbackstruct.CallbackBeforeCreateGroupResp{} err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup) if err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID) @@ -76,6 +80,9 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, groupMember *relation.Gr resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{} err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeMemberJoinGroup) if err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } if resp.MuteEndTime != nil { @@ -113,6 +120,9 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{} err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo) if err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } if resp.FaceURL != nil { diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index bdc8d058a..aac96f75c 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -9,6 +9,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) @@ -48,7 +49,13 @@ func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) er RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackBeforeSendSingleMsgResp{} - return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg) + if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } + return err + } + return nil } func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { @@ -60,7 +67,13 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) err RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackAfterSendSingleMsgResp{} - return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg) + if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } + return err + } + return nil } func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { @@ -72,7 +85,13 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} - return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg) + if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } + return err + } + return nil } func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { @@ -84,7 +103,13 @@ func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) erro GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackAfterSendGroupMsgResp{} - return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) + if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } + return err + } + return nil } func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { @@ -96,6 +121,9 @@ func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { } resp := &cbapi.CallbackMsgModifyCommandResp{} if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } if resp.Content != nil { diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 482a9d680..2227a3438 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -87,7 +87,6 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac v.Set(constant.CallbackCommand, command) url = url + "?" + v.Encode() b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) - log.ZError(ctx, "post", err, "b", string(b)) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { log.ZWarn(ctx, "callback failed but continue", err, "url", url)