mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-02 13:19:19 +08:00
group
This commit is contained in:
parent
9341a5f57f
commit
8f51749786
@ -15,6 +15,9 @@ import (
|
|||||||
cp "Open_IM/pkg/common/utils"
|
cp "Open_IM/pkg/common/utils"
|
||||||
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
|
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
|
||||||
"github.com/OpenIMSDK/getcdv3"
|
"github.com/OpenIMSDK/getcdv3"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
"path"
|
||||||
|
"runtime/debug"
|
||||||
|
|
||||||
pbCache "Open_IM/pkg/proto/cache"
|
pbCache "Open_IM/pkg/proto/cache"
|
||||||
pbConversation "Open_IM/pkg/proto/conversation"
|
pbConversation "Open_IM/pkg/proto/conversation"
|
||||||
@ -53,6 +56,34 @@ func NewGroupServer(port int) *groupServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack()))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
funcName := path.Base(info.FullMethod)
|
||||||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("not metadata")
|
||||||
|
}
|
||||||
|
operationID := md.Get("operationID")[0]
|
||||||
|
opUserID := md.Get("opUserID")[0]
|
||||||
|
ctx = trace_log.NewRpcCtx(ctx, funcName, operationID)
|
||||||
|
defer trace_log.ShowLog(ctx)
|
||||||
|
_ = opUserID
|
||||||
|
trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String())
|
||||||
|
resp, err = handler(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
constant.SetErrorForResp(err, resp.(interface {
|
||||||
|
GetCommonResp() *open_im_sdk.CommonResp
|
||||||
|
}).GetCommonResp())
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (s *groupServer) Run() {
|
func (s *groupServer) Run() {
|
||||||
log.NewInfo("", "group rpc start ")
|
log.NewInfo("", "group rpc start ")
|
||||||
listenIP := ""
|
listenIP := ""
|
||||||
@ -75,6 +106,7 @@ func (s *groupServer) Run() {
|
|||||||
var grpcOpts = []grpc.ServerOption{
|
var grpcOpts = []grpc.ServerOption{
|
||||||
grpc.MaxRecvMsgSize(recvSize),
|
grpc.MaxRecvMsgSize(recvSize),
|
||||||
grpc.MaxSendMsgSize(sendSize),
|
grpc.MaxSendMsgSize(sendSize),
|
||||||
|
grpc.UnaryInterceptor(UnaryServerInterceptor),
|
||||||
}
|
}
|
||||||
if config.Config.Prometheus.Enable {
|
if config.Config.Prometheus.Enable {
|
||||||
promePkg.NewGrpcRequestCounter()
|
promePkg.NewGrpcRequestCounter()
|
||||||
@ -114,7 +146,114 @@ func (s *groupServer) Run() {
|
|||||||
log.NewInfo("", "group rpc success")
|
log.NewInfo("", "group rpc success")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) {
|
func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) {
|
||||||
|
resp := &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}}
|
||||||
|
if err := token_verify.CheckAccessV2(ctx, req.OpUserID, req.OwnerUserID); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
var groupOwnerNum int
|
||||||
|
var userIDs []string
|
||||||
|
for _, info := range req.InitMemberList {
|
||||||
|
if info.RoleLevel == constant.GroupOwner {
|
||||||
|
groupOwnerNum++
|
||||||
|
}
|
||||||
|
userIDs = append(userIDs, info.UserID)
|
||||||
|
}
|
||||||
|
if req.OwnerUserID != "" {
|
||||||
|
groupOwnerNum++
|
||||||
|
userIDs = append(userIDs, req.OwnerUserID)
|
||||||
|
}
|
||||||
|
if groupOwnerNum != 1 {
|
||||||
|
return resp, utils.Wrap(constant.ErrArgs, "")
|
||||||
|
}
|
||||||
|
if utils.IsRepeatStringSlice(userIDs) {
|
||||||
|
return resp, utils.Wrap(constant.ErrArgs, "")
|
||||||
|
}
|
||||||
|
users, err := rocksCache.GetUserInfoFromCacheBatch(ctx, userIDs)
|
||||||
|
if err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
if len(users) != len(userIDs) {
|
||||||
|
return resp, utils.Wrap(constant.ErrArgs, "")
|
||||||
|
}
|
||||||
|
userMap := make(map[string]*imdb.User)
|
||||||
|
for i, user := range users {
|
||||||
|
userMap[user.UserID] = users[i]
|
||||||
|
}
|
||||||
|
if err := s.DelGroupAndUserCache(ctx, "", userIDs); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
if err := callbackBeforeCreateGroup(ctx, req); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
groupId := req.GroupInfo.GroupID
|
||||||
|
if groupId == "" {
|
||||||
|
groupId = utils.Md5(req.OperationID + strconv.FormatInt(time.Now().UnixNano(), 10))
|
||||||
|
bi := big.NewInt(0)
|
||||||
|
bi.SetString(groupId[0:8], 16)
|
||||||
|
groupId = bi.String()
|
||||||
|
}
|
||||||
|
groupInfo := imdb.Group{}
|
||||||
|
utils.CopyStructFields(&groupInfo, req.GroupInfo)
|
||||||
|
groupInfo.CreatorUserID = req.OpUserID
|
||||||
|
groupInfo.GroupID = groupId
|
||||||
|
groupInfo.CreateTime = time.Now()
|
||||||
|
if groupInfo.NotificationUpdateTime.Unix() < 0 {
|
||||||
|
groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0)
|
||||||
|
}
|
||||||
|
if req.GroupInfo.GroupType != constant.SuperGroup {
|
||||||
|
var groupMembers []*imdb.GroupMember
|
||||||
|
joinGroup := func(userID string, roleLevel int32) error {
|
||||||
|
groupMember := &imdb.GroupMember{GroupID: groupId, RoleLevel: roleLevel, OperatorUserID: req.OpUserID, JoinSource: constant.JoinByInvitation, InviterUserID: req.OpUserID}
|
||||||
|
user := userMap[userID]
|
||||||
|
utils.CopyStructFields(&groupMember, user)
|
||||||
|
if err := CallbackBeforeMemberJoinGroup(ctx, req.OperationID, groupMember, groupInfo.Ex); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
groupMembers = append(groupMembers, groupMember)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if req.OwnerUserID == "" {
|
||||||
|
if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, info := range req.InitMemberList {
|
||||||
|
if err := joinGroup(info.UserID, info.RoleLevel); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := db.DB.CreateSuperGroup(groupId, userIDs, len(userIDs)); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := (*imdb.Group)(nil).Create(ctx, []*imdb.Group{&groupInfo}); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
utils.CopyStructFields(resp.GroupInfo, groupInfo)
|
||||||
|
resp.GroupInfo.MemberCount = uint32(len(userIDs))
|
||||||
|
if req.GroupInfo.GroupType != constant.SuperGroup {
|
||||||
|
chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, userIDs)
|
||||||
|
} else {
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
if err := rocksCache.DelJoinedSuperGroupIDListFromCache(ctx, userID); err != nil {
|
||||||
|
trace_log.SetContextInfo(ctx, "DelJoinedSuperGroupIDListFromCache", err, "userID", userID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for _, v := range userIDs {
|
||||||
|
chat.SuperGroupNotification(req.OperationID, v, v)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *groupServer) CreateGroup1(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) {
|
||||||
resp = &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}}
|
resp = &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}}
|
||||||
ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID)
|
ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user