Merge pull request #3339 from openimsdk/cherry-pick-fa3d251

feat: GroupApplicationAgreeMemberEnterNotification splitting [Created by @withchao from #3297]
This commit is contained in:
chao 2025-05-14 16:37:21 +08:00 committed by GitHub
commit bff76dd122
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 113 additions and 19 deletions

View File

@ -7,3 +7,7 @@ multiLogin:
policy: 1
# max num of tokens in one end
maxNumOneEnd: 30
rpcMaxBodySize:
requestMaxBodySize: 8388608
responseMaxBodySize: 8388608

View File

@ -451,12 +451,25 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
return nil, err
}
if err := g.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
const singleQuantity = 50
for start := 0; start < len(groupMembers); start += singleQuantity {
end := start + singleQuantity
if end > len(groupMembers) {
end = len(groupMembers)
}
currentMembers := groupMembers[start:end]
if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, opUserID, req.InvitedUserIDs...); err != nil {
return nil, err
if err := g.db.CreateGroup(ctx, nil, currentMembers); err != nil {
return nil, err
}
userIDs := datautil.Slice(currentMembers, func(e *model.GroupMember) string {
return e.UserID
})
if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, userIDs...); err != nil {
return nil, err
}
}
return &pbgroup.InviteUserToGroupResp{}, nil
}

View File

@ -520,7 +520,11 @@ func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, invitedOpUserID string, entrantUserID ...string) error {
func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error {
return g.groupApplicationAgreeMemberEnterNotification(ctx, groupID, SendMessage, invitedOpUserID, entrantUserID...)
}
func (g *NotificationSender) groupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {

View File

@ -378,9 +378,15 @@ type AfterConfig struct {
}
type Share struct {
Secret string `mapstructure:"secret"`
IMAdminUserID []string `mapstructure:"imAdminUserID"`
MultiLogin MultiLogin `mapstructure:"multiLogin"`
Secret string `yaml:"secret"`
IMAdminUserID []string `yaml:"imAdminUserID"`
MultiLogin MultiLogin `yaml:"multiLogin"`
RPCMaxBodySize MaxRequestBody `yaml:"rpcMaxBodySize"`
}
type MaxRequestBody struct {
RequestMaxBodySize int `yaml:"requestMaxBodySize"`
ResponseMaxBodySize int `yaml:"responseMaxBodySize"`
}
type MultiLogin struct {

View File

@ -22,6 +22,7 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"strconv"
"syscall"
"time"
@ -46,8 +47,41 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
// Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
func init() {
prommetrics.RegistryAll()
}
func getConfigRpcMaxRequestBody(value reflect.Value) *conf.MaxRequestBody {
for value.Kind() == reflect.Pointer {
value = value.Elem()
}
if value.Kind() == reflect.Struct {
num := value.NumField()
for i := 0; i < num; i++ {
field := value.Field(i)
if !field.CanInterface() {
continue
}
for field.Kind() == reflect.Pointer {
field = field.Elem()
}
switch elem := field.Interface().(type) {
case conf.Share:
return &elem.RPCMaxBodySize
case conf.MaxRequestBody:
return &elem
}
if field.Kind() == reflect.Struct {
if elem := getConfigRpcMaxRequestBody(field); elem != nil {
return elem
}
}
}
}
return nil
}
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
@ -65,6 +99,25 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
conf.InitNotification(notification)
}
maxRequestBody := getConfigRpcMaxRequestBody(reflect.ValueOf(config))
log.ZDebug(ctx, "rpc start", "rpcMaxRequestBody", maxRequestBody, "rpcRegisterName", rpcRegisterName, "registerIP", registerIP, "listenIP", listenIP)
options = append(options,
mw.GrpcServer(),
)
var clientOptions []grpc.DialOption
if maxRequestBody != nil {
if maxRequestBody.RequestMaxBodySize > 0 {
options = append(options, grpc.MaxRecvMsgSize(maxRequestBody.RequestMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxRequestBody.RequestMaxBodySize)))
}
if maxRequestBody.ResponseMaxBodySize > 0 {
options = append(options, grpc.MaxSendMsgSize(maxRequestBody.ResponseMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRequestBody.ResponseMaxBodySize)))
}
}
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
@ -101,15 +154,29 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
}
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
client.AddOption(
mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")),
)
if len(clientOptions) > 0 {
client.AddOption(clientOptions...)
}
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
ctx, cancel := context.WithCancelCause(ctx)
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
select {
case <-ctx.Done():
return
case val := <-sigs:
log.ZDebug(ctx, "recv signal", "signal", val.String())
cancel(fmt.Errorf("signal %s", val.String()))
}
}()
if prometheusListenAddr != "" {
options = append(
options, mw.GrpcServer(),
prommetricsUnaryInterceptor(rpcRegisterName),