mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-04 11:22:10 +08:00
feat support rpc send msg filter: before send
This commit is contained in:
parent
ba5b7c0e7e
commit
d7ba884ca2
39
cmd/rpc/open_im_msg/filters/mock.go
Normal file
39
cmd/rpc/open_im_msg/filters/mock.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package filters
|
||||||
|
|
||||||
|
import (
|
||||||
|
rpcChat "Open_IM/internal/rpc/msg"
|
||||||
|
"Open_IM/pkg/common/constant"
|
||||||
|
pbChat "Open_IM/pkg/proto/chat"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func MockBeforeSendFilter1(ctx *rpcChat.SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
||||||
|
ctxKey := "test_key"
|
||||||
|
v := true
|
||||||
|
fmt.Printf("MockBeforeSendFilter1:%s set value to ctx,value is :%v\n", ctxKey, v)
|
||||||
|
ctx.WithValue(ctxKey, v)
|
||||||
|
|
||||||
|
return nil, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockBeforeSendFilter is a mock handle that handles custom logic before send msg.
|
||||||
|
func MockBeforeSendFilter2(ctx *rpcChat.SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
||||||
|
ctxKey := "test_key"
|
||||||
|
v, ok := ctx.Value(ctxKey).(bool)
|
||||||
|
if ok {
|
||||||
|
fmt.Printf("MockBeforeSendFilter2:%s selected from ctx,value is :%v\n", ctxKey, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("MockBeforeSendHandler trigger,contentType:%d\n", pb.MsgData.GetContentType())
|
||||||
|
if pb.MsgData.ContentType == constant.Text {
|
||||||
|
msg := string(pb.MsgData.Content)
|
||||||
|
fmt.Printf("text msg:%s", msg)
|
||||||
|
if msg == "this is a m..m..mock msg" {
|
||||||
|
fmt.Println(".==>msg had banned")
|
||||||
|
return nil, false, errors.New("BANG! This msg has been banned by MockBeforeSendHandler")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, true, nil
|
||||||
|
}
|
||||||
@ -1,7 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/cmd/rpc/open_im_msg/widget"
|
"Open_IM/cmd/rpc/open_im_msg/filters"
|
||||||
rpcChat "Open_IM/internal/rpc/msg"
|
rpcChat "Open_IM/internal/rpc/msg"
|
||||||
"flag"
|
"flag"
|
||||||
)
|
)
|
||||||
@ -10,10 +10,10 @@ func main() {
|
|||||||
rpcPort := flag.Int("port", 10300, "rpc listening port")
|
rpcPort := flag.Int("port", 10300, "rpc listening port")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
rpcServer := rpcChat.NewRpcChatServer(*rpcPort)
|
rpcServer := rpcChat.NewRpcChatServer(*rpcPort)
|
||||||
// register widgets
|
// register filters
|
||||||
|
|
||||||
// mock 注册发送前的拦截器
|
// mock 注册发送前的拦截器
|
||||||
rpcServer.UseWidgetBeforSend(widget.MockBeforeSendHandler)
|
rpcServer.UseBeforSendFilters(filters.MockBeforeSendFilter1, filters.MockBeforeSendFilter2)
|
||||||
|
|
||||||
//
|
//
|
||||||
rpcServer.Run()
|
rpcServer.Run()
|
||||||
|
|||||||
@ -1,24 +0,0 @@
|
|||||||
package widget
|
|
||||||
|
|
||||||
import (
|
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
pbChat "Open_IM/pkg/proto/chat"
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MockBeforeSendHandler is a mock handle that handles custom logic before send msg.
|
|
||||||
func MockBeforeSendHandler(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
|
||||||
fmt.Printf("MockBeforeSendHandler trigger,contentType:%d\n", pb.MsgData.GetContentType())
|
|
||||||
if pb.MsgData.ContentType == constant.Text {
|
|
||||||
msg := string(pb.MsgData.Content)
|
|
||||||
fmt.Printf("text msg:%s", msg)
|
|
||||||
if msg == "this is a m..m..mock msg" {
|
|
||||||
fmt.Println(".==>msg had banned")
|
|
||||||
return nil, false, errors.New("BANG! This msg has been banned by MockBeforeSendHandler")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, true, nil
|
|
||||||
}
|
|
||||||
69
internal/rpc/msg/context.go
Normal file
69
internal/rpc/msg/context.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package msg
|
||||||
|
|
||||||
|
import (
|
||||||
|
pbChat "Open_IM/pkg/proto/chat"
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SendContext is the most important part of RPC SendMsg. It allows us to pass variables between middleware
|
||||||
|
type SendContext struct {
|
||||||
|
ctx context.Context
|
||||||
|
rpc *rpcChat
|
||||||
|
// beforeFilters are filters which will be triggered before send msg
|
||||||
|
beforeFilters []BeforeSendFilter
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext {
|
||||||
|
return &SendContext{
|
||||||
|
ctx: ctx,
|
||||||
|
rpc: rpc,
|
||||||
|
beforeFilters: rpc.beforeSenders,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) SetCtx(ctx context.Context) {
|
||||||
|
c.ctx = ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) Value(key interface{}) interface{} {
|
||||||
|
return c.ctx.Value(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) WithValue(key, val interface{}) {
|
||||||
|
ctx := context.WithValue(c.ctx, key, val)
|
||||||
|
c.SetCtx(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// doBeforeFilters executes the pending filters in the chain inside the calling handler.
|
||||||
|
func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
||||||
|
for _, handler := range c.beforeFilters {
|
||||||
|
res, ok, err := handler(c, pb)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return res, ok, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
||||||
|
replay := pbChat.SendMsgResp{}
|
||||||
|
res, ok, err := c.doBeforeFilters(pb)
|
||||||
|
if err != nil {
|
||||||
|
return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = c.rpc.doSendMsg(c.ctx, pb)
|
||||||
|
if err != nil {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
6
internal/rpc/msg/filters.go
Normal file
6
internal/rpc/msg/filters.go
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
package msg
|
||||||
|
|
||||||
|
import pbChat "Open_IM/pkg/proto/chat"
|
||||||
|
|
||||||
|
// BeforeSendFilter handles custom logic before send msg.
|
||||||
|
type BeforeSendFilter func(ctx *SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error)
|
||||||
@ -1,14 +1,12 @@
|
|||||||
package msg
|
package msg
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/rpc/msg/widget"
|
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/kafka"
|
"Open_IM/pkg/common/kafka"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||||
pbChat "Open_IM/pkg/proto/chat"
|
pbChat "Open_IM/pkg/proto/chat"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -24,7 +22,7 @@ type rpcChat struct {
|
|||||||
etcdAddr []string
|
etcdAddr []string
|
||||||
producer *kafka.Producer
|
producer *kafka.Producer
|
||||||
// beforeSenders are filters which will be triggered before send msg
|
// beforeSenders are filters which will be triggered before send msg
|
||||||
beforeSenders []widget.BeforeSendHandler
|
beforeSenders []BeforeSendFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRpcChatServer(port int) *rpcChat {
|
func NewRpcChatServer(port int) *rpcChat {
|
||||||
@ -39,24 +37,11 @@ func NewRpcChatServer(port int) *rpcChat {
|
|||||||
return &rc
|
return &rc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpc *rpcChat) UseWidgetBeforSend(hs ...widget.BeforeSendHandler) {
|
// UseBeforSendFilters attaches a global filter to the BeforSendFilter logic
|
||||||
|
func (rpc *rpcChat) UseBeforSendFilters(hs ...BeforeSendFilter) {
|
||||||
rpc.beforeSenders = append(rpc.beforeSenders, hs...)
|
rpc.beforeSenders = append(rpc.beforeSenders, hs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpc *rpcChat) callWidgetBeforeSend(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) {
|
|
||||||
for _, handler := range rpc.beforeSenders {
|
|
||||||
res, ok, err := handler(ctx, pb)
|
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
return res, ok, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpc *rpcChat) Run() {
|
func (rpc *rpcChat) Run() {
|
||||||
log.Info("", "", "rpc get_token init...")
|
log.Info("", "", "rpc get_token init...")
|
||||||
|
|
||||||
|
|||||||
@ -104,17 +104,14 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
||||||
|
c := NewSendContext(ctx, rpc)
|
||||||
|
return c.SendMsg(pb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rpc *rpcChat) doSendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
||||||
replay := pbChat.SendMsgResp{}
|
replay := pbChat.SendMsgResp{}
|
||||||
log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
|
log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
|
||||||
|
|
||||||
res, ok, err := rpc.callWidgetBeforeSend(ctx, pb)
|
|
||||||
if err != nil {
|
|
||||||
return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0)
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
userRelationshipVerification(pb)
|
userRelationshipVerification(pb)
|
||||||
//if !utils.VerifyToken(pb.Token, pb.SendID) {
|
//if !utils.VerifyToken(pb.Token, pb.SendID) {
|
||||||
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
|
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user