mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-03 18:52:15 +08:00
feat add after filter support on sendMsg
This commit is contained in:
parent
0f7825454c
commit
7a6568a4fe
@ -4,14 +4,19 @@ import (
|
|||||||
pbChat "Open_IM/pkg/proto/chat"
|
pbChat "Open_IM/pkg/proto/chat"
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ context.Context = (*SendContext)(nil)
|
||||||
|
|
||||||
// SendContext is the most important part of RPC SendMsg. It allows us to pass variables between middleware
|
// SendContext is the most important part of RPC SendMsg. It allows us to pass variables between middleware
|
||||||
type SendContext struct {
|
type SendContext struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
rpc *rpcChat
|
rpc *rpcChat
|
||||||
// beforeFilters are filters which will be triggered before send msg
|
// beforeFilters are filters which will be triggered before send msg
|
||||||
beforeFilters []BeforeSendFilter
|
beforeFilters []BeforeSendFilter
|
||||||
|
// afterSenders are filters which will be triggered after send msg
|
||||||
|
afterSenders []AfterSendFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext {
|
func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext {
|
||||||
@ -19,22 +24,10 @@ func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext {
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
rpc: rpc,
|
rpc: rpc,
|
||||||
beforeFilters: rpc.beforeSenders,
|
beforeFilters: rpc.beforeSenders,
|
||||||
|
afterSenders: rpc.afterSenders,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SendContext) SetCtx(ctx context.Context) {
|
|
||||||
c.ctx = ctx
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SendContext) Value(key interface{}) interface{} {
|
|
||||||
return c.ctx.Value(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SendContext) WithValue(key, val interface{}) {
|
|
||||||
ctx := context.WithValue(c.ctx, key, val)
|
|
||||||
c.SetCtx(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// doBeforeFilters executes the pending filters in the chain inside the calling handler.
|
// doBeforeFilters executes the pending filters in the chain inside the calling handler.
|
||||||
func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
||||||
for _, handler := range c.beforeFilters {
|
for _, handler := range c.beforeFilters {
|
||||||
@ -50,6 +43,21 @@ func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgRes
|
|||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// doAfterFilters executes the pending filters in the chain inside the calling handler.
|
||||||
|
func (c *SendContext) doAfterFilters(req *pbChat.SendMsgReq, res *pbChat.SendMsgResp) (*pbChat.SendMsgResp, bool, error) {
|
||||||
|
for _, handler := range c.afterSenders {
|
||||||
|
res, ok, err := handler(c, req, res)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return res, ok, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
||||||
replay := pbChat.SendMsgResp{}
|
replay := pbChat.SendMsgResp{}
|
||||||
res, ok, err := c.doBeforeFilters(pb)
|
res, ok, err := c.doBeforeFilters(pb)
|
||||||
@ -67,5 +75,54 @@ func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error
|
|||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
res, ok, err = c.doAfterFilters(pb, res)
|
||||||
|
if err != nil {
|
||||||
|
return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) SetCtx(ctx context.Context) {
|
||||||
|
c.ctx = ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) WithValue(key, val interface{}) {
|
||||||
|
ctx := context.WithValue(c.ctx, key, val)
|
||||||
|
c.SetCtx(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/************************************/
|
||||||
|
/***** context *****/
|
||||||
|
/************************************/
|
||||||
|
|
||||||
|
// Deadline returns that there is no deadline (ok==false) when c has no Context.
|
||||||
|
func (c *SendContext) Deadline() (deadline time.Time, ok bool) {
|
||||||
|
if c.ctx == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return c.ctx.Deadline()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done returns nil (chan which will wait forever) when c has no Context.
|
||||||
|
func (c *SendContext) Done() <-chan struct{} {
|
||||||
|
if c.ctx == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.ctx.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns nil when c has no Context.
|
||||||
|
func (c *SendContext) Err() error {
|
||||||
|
if c.ctx == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) Value(key interface{}) interface{} {
|
||||||
|
return c.ctx.Value(key)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,3 +4,6 @@ import pbChat "Open_IM/pkg/proto/chat"
|
|||||||
|
|
||||||
// BeforeSendFilter handles custom logic before send msg.
|
// BeforeSendFilter handles custom logic before send msg.
|
||||||
type BeforeSendFilter func(ctx *SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error)
|
type BeforeSendFilter func(ctx *SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error)
|
||||||
|
|
||||||
|
// AfterSendFilter handles custom logic after send msg.
|
||||||
|
type AfterSendFilter func(ctx *SendContext, req *pbChat.SendMsgReq, res *pbChat.SendMsgResp) (*pbChat.SendMsgResp, bool, error)
|
||||||
|
|||||||
@ -23,6 +23,9 @@ type rpcChat struct {
|
|||||||
producer *kafka.Producer
|
producer *kafka.Producer
|
||||||
// beforeSenders are filters which will be triggered before send msg
|
// beforeSenders are filters which will be triggered before send msg
|
||||||
beforeSenders []BeforeSendFilter
|
beforeSenders []BeforeSendFilter
|
||||||
|
|
||||||
|
// afterSenders are filters which will be triggered after send msg
|
||||||
|
afterSenders []AfterSendFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRpcChatServer(port int) *rpcChat {
|
func NewRpcChatServer(port int) *rpcChat {
|
||||||
@ -42,6 +45,11 @@ func (rpc *rpcChat) UseBeforSendFilters(hs ...BeforeSendFilter) {
|
|||||||
rpc.beforeSenders = append(rpc.beforeSenders, hs...)
|
rpc.beforeSenders = append(rpc.beforeSenders, hs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseAfterSendFilters attaches a global filter to the AfterSendFilter logic
|
||||||
|
func (rpc *rpcChat) UseAfterSendFilters(hs ...AfterSendFilter) {
|
||||||
|
rpc.afterSenders = append(rpc.afterSenders, hs...)
|
||||||
|
}
|
||||||
|
|
||||||
func (rpc *rpcChat) Run() {
|
func (rpc *rpcChat) Run() {
|
||||||
log.Info("", "", "rpc get_token init...")
|
log.Info("", "", "rpc get_token init...")
|
||||||
|
|
||||||
|
|||||||
@ -1,10 +0,0 @@
|
|||||||
package widget
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
pbChat "Open_IM/pkg/proto/chat"
|
|
||||||
)
|
|
||||||
|
|
||||||
// BeforeSendHandler handles custom logic before send msg.
|
|
||||||
type BeforeSendHandler func(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error)
|
|
||||||
Loading…
x
Reference in New Issue
Block a user