mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 02:16:16 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
8749e52809
@ -2,11 +2,13 @@ package push
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
)
|
)
|
||||||
@ -39,8 +41,10 @@ func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDa
|
|||||||
Content: utils.GetContent(msg),
|
Content: utils.GetContent(msg),
|
||||||
}
|
}
|
||||||
resp := &callbackstruct.CallbackBeforePushResp{}
|
resp := &callbackstruct.CallbackBeforePushResp{}
|
||||||
err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush)
|
if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil {
|
||||||
if err != nil {
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(resp.UserIDs) != 0 {
|
if len(resp.UserIDs) != 0 {
|
||||||
@ -100,6 +104,9 @@ func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg
|
|||||||
}
|
}
|
||||||
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
|
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
|
||||||
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
|
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(resp.UserIDs) != 0 {
|
if len(resp.UserIDs) != 0 {
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
@ -79,7 +78,7 @@ func (p *Pusher) DismissGroup(ctx context.Context, groupID string) error {
|
|||||||
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
|
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
|
||||||
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
||||||
// callback
|
// callback
|
||||||
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil && err != errs.ErrCallbackContinue {
|
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// push
|
// push
|
||||||
@ -132,7 +131,7 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
|
|||||||
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
||||||
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
||||||
var pushToUserIDs []string
|
var pushToUserIDs []string
|
||||||
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil && err != errs.ErrCallbackContinue {
|
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(pushToUserIDs) == 0 {
|
if len(pushToUserIDs) == 0 {
|
||||||
|
@ -2,17 +2,17 @@ package friend
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
cbapi "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
cbapi "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
pbfriend "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend"
|
pbfriend "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error {
|
func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error {
|
||||||
log.ZInfo(ctx, "CallbackBeforeAddFriend", "in", req)
|
|
||||||
if !config.Config.Callback.CallbackBeforeAddFriend.Enable {
|
if !config.Config.Callback.CallbackBeforeAddFriend.Enable {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -24,6 +24,11 @@ func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriend
|
|||||||
OperationID: mcontext.GetOperationID(ctx),
|
OperationID: mcontext.GetOperationID(ctx),
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackBeforeAddFriendResp{}
|
resp := &cbapi.CallbackBeforeAddFriendResp{}
|
||||||
defer log.ZInfo(ctx, "CallbackBeforeAddFriend", "out", &resp)
|
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil {
|
||||||
return http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend)
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
@ -44,6 +45,9 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (
|
|||||||
resp := &callbackstruct.CallbackBeforeCreateGroupResp{}
|
resp := &callbackstruct.CallbackBeforeCreateGroupResp{}
|
||||||
err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup)
|
err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID)
|
utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID)
|
||||||
@ -76,6 +80,9 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, groupMember *relation.Gr
|
|||||||
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{}
|
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{}
|
||||||
err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeMemberJoinGroup)
|
err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeMemberJoinGroup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if resp.MuteEndTime != nil {
|
if resp.MuteEndTime != nil {
|
||||||
@ -113,6 +120,9 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe
|
|||||||
resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{}
|
resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{}
|
||||||
err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo)
|
err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if resp.FaceURL != nil {
|
if resp.FaceURL != nil {
|
||||||
|
@ -155,7 +155,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
|||||||
if len(userMap) != len(userIDs) {
|
if len(userMap) != len(userIDs) {
|
||||||
return nil, errs.ErrUserIDNotFound.Wrap("user not found")
|
return nil, errs.ErrUserIDNotFound.Wrap("user not found")
|
||||||
}
|
}
|
||||||
if err := CallbackBeforeCreateGroup(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeCreateGroup(ctx, req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var groupMembers []*relationTb.GroupMemberModel
|
var groupMembers []*relationTb.GroupMemberModel
|
||||||
@ -173,7 +173,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
|||||||
groupMember.InviterUserID = mcontext.GetOpUserID(ctx)
|
groupMember.InviterUserID = mcontext.GetOpUserID(ctx)
|
||||||
groupMember.JoinTime = time.Now()
|
groupMember.JoinTime = time.Now()
|
||||||
groupMember.MuteEndTime = time.Unix(0, 0)
|
groupMember.MuteEndTime = time.Unix(0, 0)
|
||||||
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
groupMembers = append(groupMembers, groupMember)
|
groupMembers = append(groupMembers, groupMember)
|
||||||
@ -364,7 +364,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
|
|||||||
member.JoinSource = constant.JoinByInvitation
|
member.JoinSource = constant.JoinByInvitation
|
||||||
member.JoinTime = time.Now()
|
member.JoinTime = time.Now()
|
||||||
member.MuteEndTime = time.Unix(0, 0)
|
member.MuteEndTime = time.Unix(0, 0)
|
||||||
if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
groupMembers = append(groupMembers, member)
|
groupMembers = append(groupMembers, member)
|
||||||
@ -704,7 +704,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
|
|||||||
OperatorUserID: mcontext.GetOpUserID(ctx),
|
OperatorUserID: mcontext.GetOpUserID(ctx),
|
||||||
Ex: groupRequest.Ex,
|
Ex: groupRequest.Ex,
|
||||||
}
|
}
|
||||||
if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -756,7 +756,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
|
|||||||
groupMember.InviterUserID = req.InviterUserID
|
groupMember.InviterUserID = req.InviterUserID
|
||||||
groupMember.JoinTime = time.Now()
|
groupMember.JoinTime = time.Now()
|
||||||
groupMember.MuteEndTime = time.Unix(0, 0)
|
groupMember.MuteEndTime = time.Unix(0, 0)
|
||||||
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue {
|
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := s.GroupDatabase.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil {
|
if err := s.GroupDatabase.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil {
|
||||||
|
@ -7,7 +7,9 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
)
|
)
|
||||||
@ -47,7 +49,13 @@ func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) er
|
|||||||
RecvID: msg.MsgData.RecvID,
|
RecvID: msg.MsgData.RecvID,
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackBeforeSendSingleMsgResp{}
|
resp := &cbapi.CallbackBeforeSendSingleMsgResp{}
|
||||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg)
|
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg); err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||||
@ -55,11 +63,17 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
req := &cbapi.CallbackAfterSendSingleMsgReq{
|
req := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendSingleMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackAfterSendSingleMsgCommand),
|
||||||
RecvID: msg.MsgData.RecvID,
|
RecvID: msg.MsgData.RecvID,
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackAfterSendSingleMsgResp{}
|
resp := &cbapi.CallbackAfterSendSingleMsgResp{}
|
||||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg)
|
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg); err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||||
@ -67,11 +81,17 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
req := &cbapi.CallbackAfterSendGroupMsgReq{
|
req := &cbapi.CallbackAfterSendGroupMsgReq{
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendSingleMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendGroupMsgCommand),
|
||||||
GroupID: msg.MsgData.GroupID,
|
GroupID: msg.MsgData.GroupID,
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackBeforeSendGroupMsgResp{}
|
resp := &cbapi.CallbackBeforeSendGroupMsgResp{}
|
||||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg)
|
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg); err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||||
@ -83,7 +103,13 @@ func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) erro
|
|||||||
GroupID: msg.MsgData.GroupID,
|
GroupID: msg.MsgData.GroupID,
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackAfterSendGroupMsgResp{}
|
resp := &cbapi.CallbackAfterSendGroupMsgResp{}
|
||||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||||
@ -94,7 +120,10 @@ func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
|||||||
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackMsgModifyCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackMsgModifyCommand),
|
||||||
}
|
}
|
||||||
resp := &cbapi.CallbackMsgModifyCommandResp{}
|
resp := &cbapi.CallbackMsgModifyCommandResp{}
|
||||||
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil {
|
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil {
|
||||||
|
if err == errs.ErrCallbackContinue {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if resp.Content != nil {
|
if resp.Content != nil {
|
||||||
@ -116,5 +145,6 @@ func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
|||||||
utils.NotNilReplace(&msg.MsgData.AtUserIDList, resp.AtUserIDList)
|
utils.NotNilReplace(&msg.MsgData.AtUserIDList, resp.AtUserIDList)
|
||||||
utils.NotNilReplace(&msg.MsgData.AttachedInfo, resp.AttachedInfo)
|
utils.NotNilReplace(&msg.MsgData.AttachedInfo, resp.AttachedInfo)
|
||||||
utils.NotNilReplace(&msg.MsgData.Ex, resp.Ex)
|
utils.NotNilReplace(&msg.MsgData.Ex, resp.Ex)
|
||||||
|
log.ZDebug(ctx, "callbackMsgModify", "msg", msg.MsgData)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
@ -30,28 +29,3 @@ func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgDa
|
|||||||
}
|
}
|
||||||
return req.MsgData, nil
|
return req.MsgData, nil
|
||||||
}
|
}
|
||||||
func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
|
|
||||||
if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
|
||||||
log.ZWarn(ctx, "CallbackMsgModify failed", err, "req", req.String())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return req.MsgData, nil
|
|
||||||
}
|
|
||||||
func MessageBeforeSendCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
|
|
||||||
switch req.MsgData.SessionType {
|
|
||||||
case constant.SingleChatType:
|
|
||||||
if err := callbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
|
||||||
log.ZWarn(ctx, "CallbackBeforeSendSingleMsg failed", err, "req", req.String())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
case constant.NotificationChatType:
|
|
||||||
case constant.SuperGroupChatType:
|
|
||||||
if err := callbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
|
||||||
log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return nil, errs.ErrArgs.Wrap("unknown sessionType")
|
|
||||||
}
|
|
||||||
return req.MsgData, nil
|
|
||||||
}
|
|
||||||
|
@ -18,9 +18,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbMsg.SendMsgReq) (resp *p
|
|||||||
return nil, errs.ErrMessageHasReadDisable.Wrap()
|
return nil, errs.ErrMessageHasReadDisable.Wrap()
|
||||||
}
|
}
|
||||||
m.encapsulateMsgData(req.MsgData)
|
m.encapsulateMsgData(req.MsgData)
|
||||||
if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
switch req.MsgData.SessionType {
|
switch req.MsgData.SessionType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
return m.sendMsgSingleChat(ctx, req)
|
return m.sendMsgSingleChat(ctx, req)
|
||||||
@ -34,20 +31,26 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbMsg.SendMsgReq) (resp *p
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) {
|
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) {
|
||||||
resp = &pbMsg.SendMsgResp{}
|
|
||||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
|
promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
|
||||||
if err = m.messageVerification(ctx, req); err != nil {
|
if err = m.messageVerification(ctx, req); err != nil {
|
||||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
|
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if err = callbackBeforeSendGroupMsg(ctx, req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := callbackMsgModify(ctx, req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
err = m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
|
err = m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
|
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
|
||||||
log.ZError(ctx, "CallbackAfterSendGroupMsg", err)
|
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
|
||||||
}
|
}
|
||||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
|
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
|
||||||
|
resp = &pbMsg.SendMsgResp{}
|
||||||
resp.SendTime = req.MsgData.SendTime
|
resp.SendTime = req.MsgData.SendTime
|
||||||
resp.ServerMsgID = req.MsgData.ServerMsgID
|
resp.ServerMsgID = req.MsgData.ServerMsgID
|
||||||
resp.ClientMsgID = req.MsgData.ClientMsgID
|
resp.ClientMsgID = req.MsgData.ClientMsgID
|
||||||
@ -85,13 +88,19 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq
|
|||||||
promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter)
|
promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter)
|
||||||
return nil, errs.ErrUserNotRecvMsg
|
return nil, errs.ErrUserNotRecvMsg
|
||||||
} else {
|
} else {
|
||||||
|
if err = callbackBeforeSendSingleMsg(ctx, req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := callbackMsgModify(ctx, req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
||||||
promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter)
|
promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = callbackAfterSendSingleMsg(ctx, req)
|
err = callbackAfterSendSingleMsg(ctx, req)
|
||||||
if err != nil && err != errs.ErrCallbackContinue {
|
if err != nil {
|
||||||
return nil, err
|
log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req)
|
||||||
}
|
}
|
||||||
resp = &pbMsg.SendMsgResp{
|
resp = &pbMsg.SendMsgResp{
|
||||||
ServerMsgID: req.MsgData.ServerMsgID,
|
ServerMsgID: req.MsgData.ServerMsgID,
|
||||||
|
@ -81,7 +81,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
|||||||
MessageLocker: NewLockerMessage(cacheModel),
|
MessageLocker: NewLockerMessage(cacheModel),
|
||||||
}
|
}
|
||||||
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
|
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
|
||||||
s.addInterceptorHandler(MessageHasReadEnabled, MessageModifyCallback)
|
s.addInterceptorHandler(MessageHasReadEnabled)
|
||||||
s.initPrometheus()
|
s.initPrometheus()
|
||||||
msg.RegisterMsgServer(server, s)
|
msg.RegisterMsgServer(server, s)
|
||||||
return nil
|
return nil
|
||||||
|
@ -10,15 +10,17 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
urlLib "net/url"
|
urlLib "net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
)
|
)
|
||||||
|
|
||||||
var client http.Client
|
var client http.Client
|
||||||
@ -80,18 +82,21 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input
|
|||||||
}
|
}
|
||||||
|
|
||||||
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {
|
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {
|
||||||
|
defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "callbackConfig", callbackConfig)
|
||||||
v := urlLib.Values{}
|
v := urlLib.Values{}
|
||||||
v.Set(constant.CallbackCommand, command)
|
v.Set(constant.CallbackCommand, command)
|
||||||
url = url + "?" + v.Encode()
|
url = url + "?" + v.Encode()
|
||||||
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
|
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
|
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
|
||||||
|
log.ZWarn(ctx, "callback failed but continue", err, "url", url)
|
||||||
return errs.ErrCallbackContinue
|
return errs.ErrCallbackContinue
|
||||||
}
|
}
|
||||||
return errs.ErrNetwork.Wrap(err.Error())
|
return errs.ErrNetwork.Wrap(err.Error())
|
||||||
}
|
}
|
||||||
if err = json.Unmarshal(b, output); err != nil {
|
if err = json.Unmarshal(b, output); err != nil {
|
||||||
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
|
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
|
||||||
|
log.ZWarn(ctx, "callback failed but continue", err, "url", url)
|
||||||
return errs.ErrCallbackContinue
|
return errs.ErrCallbackContinue
|
||||||
}
|
}
|
||||||
return errs.ErrData.Wrap(err.Error())
|
return errs.ErrData.Wrap(err.Error())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user