feat: GroupApplicationAgreeMemberEnterNotification splitting (#3297)

* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash

* fix: fill send time

* fix: fill send time

* fix: crash caused by withdrawing messages from users who have left the group

* fix: user msg timestamp

* seq read config

* seq read config

* fix: the source message of the reference is withdrawn, and the referenced message is deleted

* feat: optimize the default notification.yml

* fix: shouldPushOffline

* fix: the sorting is wrong after canceling the administrator in group settings

* feat: Sending messages supports returning fields modified by webhook

* feat: Sending messages supports returning fields modified by webhook

* feat: Sending messages supports returning fields modified by webhook

* fix: oss specifies content-type when uploading

* fix: the version number contains a line break

* fix: the version number contains a line break

* feat: GetConversationsHasReadAndMaxSeq support pinned

* feat: GetConversationsHasReadAndMaxSeq support pinned

* feat: GetConversationsHasReadAndMaxSeq support pinned

* fix: transferring the group owner to a muted member, incremental version error

* feat: GroupApplicationAgreeMemberEnterNotification splitting, rpc body size limit

* feat: GroupApplicationAgreeMemberEnterNotification splitting, rpc body size limit

(cherry picked from commit fa3d251dcb3bd063289b37e3b8530ff53c6a6af8)

# Conflicts:
#	config/share.yml
#	pkg/common/config/config.go
#	pkg/common/startrpc/start.go
This commit is contained in:
chao 2025-04-22 17:02:45 +08:00 committed by withchao
parent cc957886b1
commit 2e58a5cc82
5 changed files with 81 additions and 6 deletions

View File

@ -15,4 +15,8 @@ imAdminUserID: [ imAdmin ]
# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time
multiLogin:
policy: 1
maxNumOneEnd: 30
maxNumOneEnd: 30
rpcMaxBodySize:
requestMaxBodySize: 8388608
responseMaxBodySize: 8388608

View File

@ -453,12 +453,25 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
return nil, err
}
if err := g.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
const singleQuantity = 50
for start := 0; start < len(groupMembers); start += singleQuantity {
end := start + singleQuantity
if end > len(groupMembers) {
end = len(groupMembers)
}
currentMembers := groupMembers[start:end]
if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, req.InvitedUserIDs...); err != nil {
return nil, err
if err := g.db.CreateGroup(ctx, nil, currentMembers); err != nil {
return nil, err
}
userIDs := datautil.Slice(currentMembers, func(e *model.GroupMember) string {
return e.UserID
})
if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, userIDs...); err != nil {
return nil, err
}
}
return &pbgroup.InviteUserToGroupResp{}, nil
}

View File

@ -521,6 +521,10 @@ func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips
}
func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error {
return g.groupApplicationAgreeMemberEnterNotification(ctx, groupID, SendMessage, invitedOpUserID, entrantUserID...)
}
func (g *NotificationSender) groupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {

View File

@ -384,6 +384,12 @@ type Share struct {
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
IMAdminUserID []string `mapstructure:"imAdminUserID"`
MultiLogin MultiLogin `mapstructure:"multiLogin"`
RPCMaxBodySize MaxRequestBody `mapstructure:"rpcMaxBodySize"`
}
type MaxRequestBody struct {
RequestMaxBodySize int `mapstructure:"requestMaxBodySize"`
ResponseMaxBodySize int `mapstructure:"responseMaxBodySize"`
}
type MultiLogin struct {

View File

@ -22,6 +22,7 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"strconv"
"syscall"
"time"
@ -43,6 +44,36 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
func getConfigRpcMaxRequestBody(value reflect.Value) *conf.MaxRequestBody {
for value.Kind() == reflect.Pointer {
value = value.Elem()
}
if value.Kind() == reflect.Struct {
num := value.NumField()
for i := 0; i < num; i++ {
field := value.Field(i)
if !field.CanInterface() {
continue
}
for field.Kind() == reflect.Pointer {
field = field.Elem()
}
switch elem := field.Interface().(type) {
case conf.Share:
return &elem.RPCMaxBodySize
case conf.MaxRequestBody:
return &elem
}
if field.Kind() == reflect.Struct {
if elem := getConfigRpcMaxRequestBody(field); elem != nil {
return elem
}
}
}
}
return nil
}
// Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *conf.Share, config T,
@ -50,6 +81,19 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption) error {
maxRequestBody := &share.RPCMaxBodySize
var clientOptions []grpc.DialOption
if maxRequestBody != nil {
if maxRequestBody.RequestMaxBodySize > 0 {
options = append(options, grpc.MaxRecvMsgSize(maxRequestBody.RequestMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxRequestBody.RequestMaxBodySize)))
}
if maxRequestBody.ResponseMaxBodySize > 0 {
options = append(options, grpc.MaxSendMsgSize(maxRequestBody.ResponseMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRequestBody.ResponseMaxBodySize)))
}
}
var (
rpcTcpAddr string
netDone = make(chan struct{}, 2)
@ -93,6 +137,10 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
if len(clientOptions) > 0 {
client.AddOption(clientOptions...)
}
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {