diff --git a/config/webhooks.yml b/config/webhooks.yml index 854d2dc2c..41c60e7e2 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -3,14 +3,7 @@ beforeSendSingleMsg: enable: false timeout: 5 failedContinue: true - # Only the contentType in allowedTypes will send the callback. - # Supports two formats: a single type or a range. The range is defined by the lower and upper bounds connected with a hyphen ("-"). - # e.g. allowedTypes: [1, 100, 200-500, 600-700] means that only contentType within the range - # {1, 100} ∪ [200, 500] ∪ [600, 700] will be allowed through the filter. - # If not set, all contentType messages will through this filter. - allowedTypes: [] # Only the contentType not in deniedTypes will send the callback. - # Supports two formats, same as allowedTypes. # If not set, all contentType messages will through this filter. deniedTypes: [] beforeUpdateUserInfoEx: @@ -23,31 +16,30 @@ afterUpdateUserInfoEx: afterSendSingleMsg: enable: false timeout: 5 - # Only the senID/recvID specified in attentionIds will send the callback + # Only the recvID specified in attentionIds will send the callback # if not set, all user messages will be callback attentionIds: [] # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] beforeSendGroupMsg: enable: false timeout: 5 failedContinue: true # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] beforeMsgModify: enable: false timeout: 5 failedContinue: true # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] afterSendGroupMsg: enable: false timeout: 5 + # Only the recvID specified in attentionIds will send the callback + # if not set, all user messages will be callback + attentionIds: [] # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] afterUserOnline: enable: false diff --git a/go.mod b/go.mod index 9a66d7cea..960bc56ce 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.72 + github.com/openimsdk/protocol v0.0.72-alpha.74 + github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6bd4ecde0..63dfe9236 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= -github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY= -github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.74 h1:cGycdzEOxjPuaeoQhIWEKKVf5zp1I+wx7ZnBemjCJJI= +github.com/openimsdk/protocol v0.0.72-alpha.74/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= +github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/msg.go b/internal/api/msg.go index 0b73af0cb..5503527a5 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -15,12 +15,16 @@ package api import ( + "encoding/base64" + "encoding/json" + "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" @@ -368,6 +372,83 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { apiresp.GinSuccess(c, resp) } +func (m *MessageApi) SendSimpleMessage(c *gin.Context) { + encodedKey, ok := c.GetQuery(webhook.Key) + if !ok { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap()) + return + } + + decodedData, err := base64.StdEncoding.DecodeString(encodedKey) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var ( + req apistruct.SendSingleMsgReq + keyMsgData apistruct.KeyMsgData + + sendID string + sessionType int32 + recvID string + ) + err = json.Unmarshal(decodedData, &keyMsgData) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + if keyMsgData.GroupID != "" { + sessionType = constant.ReadGroupChatType + sendID = req.SendID + } else { + sessionType = constant.SingleChatType + sendID = keyMsgData.RecvID + recvID = keyMsgData.SendID + } + // check param + if keyMsgData.SendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap()) + return + } + if sendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap()) + return + } + + msgData := &sdkws.MsgData{ + SendID: sendID, + RecvID: recvID, + GroupID: keyMsgData.GroupID, + ClientMsgID: idutil.GetMsgIDByMD5(sendID), + SenderPlatformID: constant.AdminPlatformID, + SessionType: sessionType, + MsgFrom: constant.UserMsgType, + ContentType: constant.Text, + Content: []byte(req.Content), + OfflinePushInfo: req.OfflinePushInfo, + Ex: req.Ex, + } + + respPb, err := m.Client.SendMsg(c, &msg.SendMsgReq{MsgData: msgData}) + if err != nil { + apiresp.GinError(c, err) + return + } + + var status = constant.MsgSendSuccessed + + _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ + Status: int32(status), + }) + + if err != nil { + apiresp.GinError(c, err) + return + } + + apiresp.GinSuccess(c, respPb) +} + func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client) } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index c66dd6ca9..adf1ff735 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,7 +16,11 @@ package msg import ( "context" + "encoding/base64" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/tools/utils/stringutil" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -94,7 +98,7 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } - m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after) + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) } func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { @@ -128,7 +132,8 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config. CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } - m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after) + + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) } func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { @@ -192,3 +197,15 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft } m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } + +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} diff --git a/internal/rpc/msg/filter.go b/internal/rpc/msg/filter.go index ed1a488f1..36511ec7b 100644 --- a/internal/rpc/msg/filter.go +++ b/internal/rpc/msg/filter.go @@ -1,11 +1,13 @@ package msg import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - pbchat "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/tools/utils/datautil" "strconv" "strings" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/protocol/constant" + pbchat "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/tools/utils/datautil" ) const ( @@ -13,28 +15,50 @@ const ( ) func filterAfterMsg(msg *pbchat.SendMsgReq, after *config.AfterConfig) bool { - return filterMsg(msg, after.AttentionIds, after.AllowedTypes, after.DeniedTypes) + return filterMsg(msg, after.AttentionIds, after.DeniedTypes) } func filterBeforeMsg(msg *pbchat.SendMsgReq, before *config.BeforeConfig) bool { - return filterMsg(msg, nil, before.AllowedTypes, before.DeniedTypes) + return filterMsg(msg, nil, before.DeniedTypes) } -func filterMsg(msg *pbchat.SendMsgReq, attentionIds, allowedTypes, deniedTypes []string) bool { +func filterMsg(msg *pbchat.SendMsgReq, attentionIds []string, deniedTypes []int32) bool { // According to the attentionIds configuration, only some users are sent - if len(attentionIds) != 0 && !datautil.Contains([]string{msg.MsgData.SendID, msg.MsgData.RecvID}, attentionIds...) { + if len(attentionIds) != 0 && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) { return false } - if len(allowedTypes) != 0 && !isInInterval(msg.MsgData.ContentType, allowedTypes) { + + if defaultDeniedTypes(msg.MsgData.ContentType) { return false } - if len(deniedTypes) != 0 && isInInterval(msg.MsgData.ContentType, deniedTypes) { + + if len(deniedTypes) != 0 && datautil.Contain(msg.MsgData.ContentType, deniedTypes...) { return false } + //if len(allowedTypes) != 0 && !isInInterval(msg.MsgData.ContentType, allowedTypes) { + // return false + //} + //if len(deniedTypes) != 0 && isInInterval(msg.MsgData.ContentType, deniedTypes) { + // return false + //} return true } -func isInInterval(contentType int32, interval []string) bool { +func defaultDeniedTypes(contentType int32) bool { + if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd { + return true + } + if contentType == constant.Typing { + return true + } + return false +} + +// isInInterval if data is in interval +// Supports two formats: a single type or a range. The range is defined by the lower and upper bounds connected with a hyphen ("-") +// e.g. [1, 100, 200-500, 600-700] means that only data within the range +// {1, 100} ∪ [200, 500] ∪ [600, 700] will return true. +func isInInterval(data int32, interval []string) bool { for _, v := range interval { if strings.Contains(v, separator) { // is interval @@ -50,7 +74,7 @@ func isInInterval(contentType int32, interval []string) bool { if err != nil { continue } - if datautil.BetweenEq(int(contentType), bottom, top) { + if datautil.BetweenEq(int(data), bottom, top) { return true } } else { @@ -58,7 +82,7 @@ func isInInterval(contentType int32, interval []string) bool { if err != nil { continue } - if int(contentType) == iv { + if int(data) == iv { return true } } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 15e93a988..d4fe7ecc4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -566,7 +566,7 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser. } // Convert users to response format - resp := s.userModelToResp(users, req.Pagination) + resp := s.userModelToResp(users, req.Pagination, req.AppManagerLevel) if resp.Total != 0 { return resp, nil } @@ -576,17 +576,24 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser. if err != nil { return nil, err } - resp = s.userModelToResp(users, req.Pagination) + resp = s.userModelToResp(users, req.Pagination, req.AppManagerLevel) return resp, nil } // If no keyword, find users with notification settings - users, err = s.db.FindNotification(ctx, constant.AppNotificationAdmin) - if err != nil { - return nil, err + if req.AppManagerLevel != nil { + users, err = s.db.FindNotification(ctx, int64(*req.AppManagerLevel)) + if err != nil { + return nil, err + } + } else { + users, err = s.db.FindSystemAccount(ctx) + if err != nil { + return nil, err + } } - resp := s.userModelToResp(users, req.Pagination) + resp := s.userModelToResp(users, req.Pagination, req.AppManagerLevel) return resp, nil } @@ -625,11 +632,16 @@ func (s *userServer) genUserID() string { return string(data) } -func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pagination.Pagination) *pbuser.SearchNotificationAccountResp { +func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pagination.Pagination, appManagerLevel *int32) *pbuser.SearchNotificationAccountResp { accounts := make([]*pbuser.NotificationAccountInfo, 0) var total int64 for _, v := range users { if v.AppMangerLevel >= constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) { + if appManagerLevel != nil { + if v.AppMangerLevel != *appManagerLevel { + continue + } + } temp := &pbuser.NotificationAccountInfo{ UserID: v.UserID, FaceURL: v.FaceURL, diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index f4deb9fb1..4f1d5863f 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -111,6 +111,21 @@ type BatchSendMsgResp struct { FailedIDs []string `json:"failedUserIDs"` } +// SendSingleMsgReq defines the structure for sending a message to multiple recipients. +type SendSingleMsgReq struct { + // groupMsg should appoint sendID + SendID string `json:"sendID"` + Content string `json:"content" binding:"required"` + OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` + Ex string `json:"ex"` +} + +type KeyMsgData struct { + SendID string `json:"sendID"` + RecvID string `json:"recvID"` + GroupID string `json:"groupID"` +} + // SingleReturnResult encapsulates the result of a single message send attempt. type SingleReturnResult struct { // ServerMsgID is the message identifier on the server-side. diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 275dec70f..eb38b64ac 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -364,19 +364,17 @@ type Redis struct { } type BeforeConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` - FailedContinue bool `mapstructure:"failedContinue"` - AllowedTypes []string `mapstructure:"allowedTypes"` - DeniedTypes []string `mapstructure:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + FailedContinue bool `yaml:"failedContinue"` + DeniedTypes []int32 `yaml:"deniedTypes"` } type AfterConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` - AttentionIds []string `mapstructure:"attentionIds"` - AllowedTypes []string `mapstructure:"allowedTypes"` - DeniedTypes []string `mapstructure:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + AttentionIds []string `yaml:"attentionIds"` + DeniedTypes []int32 `yaml:"deniedTypes"` } type Share struct { diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go index 3f34481a3..a8ef1033e 100644 --- a/pkg/common/storage/controller/user.go +++ b/pkg/common/storage/controller/user.go @@ -20,6 +20,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/tx" "github.com/openimsdk/tools/utils/datautil" @@ -37,8 +38,10 @@ type UserDatabase interface { Find(ctx context.Context, userIDs []string) (users []*model.User, err error) // Find userInfo By Nickname FindByNickname(ctx context.Context, nickname string) (users []*model.User, err error) - // Find notificationAccounts + // FindNotification find system account by level FindNotification(ctx context.Context, level int64) (users []*model.User, err error) + // FindSystemAccount find all system account + FindSystemAccount(ctx context.Context) (users []*model.User, err error) // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage Create(ctx context.Context, users []*model.User) (err error) // UpdateByMap update (zero value) external guarantee userID exists @@ -139,6 +142,10 @@ func (u *userDatabase) FindNotification(ctx context.Context, level int64) (users return u.userDB.TakeNotification(ctx, level) } +func (u *userDatabase) FindSystemAccount(ctx context.Context) (users []*model.User, err error) { + return u.userDB.TakeGTEAppManagerLevel(ctx, constant.AppNotificationAdmin) +} + // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage. func (u *userDatabase) Create(ctx context.Context, users []*model.User) (err error) { return u.tx.Transaction(ctx, func(ctx context.Context) error { diff --git a/pkg/common/storage/database/mgo/user.go b/pkg/common/storage/database/mgo/user.go index ee92b7554..a08765baf 100644 --- a/pkg/common/storage/database/mgo/user.go +++ b/pkg/common/storage/database/mgo/user.go @@ -16,9 +16,10 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/db/mongoutil" @@ -71,6 +72,10 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": level}) } +func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) { + return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}}) +} + func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) { return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"nickname": nickname}) } diff --git a/pkg/common/storage/database/user.go b/pkg/common/storage/database/user.go index 4ddc8285f..c597424b9 100644 --- a/pkg/common/storage/database/user.go +++ b/pkg/common/storage/database/user.go @@ -16,10 +16,11 @@ package database import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/db/pagination" - "time" ) type User interface { @@ -28,6 +29,7 @@ type User interface { Find(ctx context.Context, userIDs []string) (users []*model.User, err error) Take(ctx context.Context, userID string) (user *model.User, err error) TakeNotification(ctx context.Context, level int64) (user []*model.User, err error) + TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*model.User, err error) PageFindUser(ctx context.Context, level1 int64, level2 int64, pagination pagination.Pagination) (count int64, users []*model.User, err error) diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go index e46f08806..0cd13f6e2 100644 --- a/pkg/common/webhook/http_client.go +++ b/pkg/common/webhook/http_client.go @@ -17,6 +17,9 @@ package webhook import ( "context" "encoding/json" + "net/http" + "net/url" + "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -25,7 +28,6 @@ import ( "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/utils/httputil" - "net/http" ) type Client struct { @@ -37,6 +39,8 @@ type Client struct { const ( webhookWorkerCount = 2 webhookBufferSize = 100 + + Key = "key" ) func NewWebhookClient(url string, options ...*memamq.MemoryQueue) *Client { @@ -66,6 +70,12 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru } } +func (c *Client) AsyncPostWithQuery(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig, queryParams map[string]string) { + if after.Enable { + c.queue.Push(func() { c.postWithQuery(ctx, command, req, resp, after.Timeout, queryParams) }) + } +} + func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error { ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) fullURL := c.url + "/" + command @@ -84,3 +94,41 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) return nil } + +func (c *Client) postWithQuery(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int, queryParams map[string]string) error { + ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) + fullURL := c.url + "/" + command + + parsedURL, err := url.Parse(fullURL) + if err != nil { + return servererrs.ErrNetwork.WrapMsg(err.Error(), "failed to parse URL", fullURL) + } + + query := parsedURL.Query() + + operationID, _ := ctx.Value(constant.OperationID).(string) + + for key, value := range queryParams { + query.Set(key, value) + } + + parsedURL.RawQuery = query.Encode() + + fullURL = parsedURL.String() + log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) + + b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout) + if err != nil { + return servererrs.ErrNetwork.WrapMsg(err.Error(), "post url", fullURL) + } + + if err = json.Unmarshal(b, output); err != nil { + return servererrs.ErrData.WithDetail(err.Error() + " response format error") + } + if err := output.Parse(); err != nil { + return err + } + + log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) + return nil +}