diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index d4619b8d5..1d2455755 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -170,6 +170,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.message = &msgFromMQ log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) + //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, ctxMsg) aggregationMsgs[string(consumerMessages[i].Key)] = oldM @@ -211,7 +212,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 0, 1000) - t := time.NewTicker(time.Duration(100) * time.Millisecond) + t := time.NewTicker(time.Millisecond * 100) go func() { for { select { diff --git a/internal/objstorage/controller.go b/internal/objstorage/controller.go deleted file mode 100644 index e57c0042a..000000000 --- a/internal/objstorage/controller.go +++ /dev/null @@ -1,235 +0,0 @@ -package objstorage - -import ( - "bytes" - "context" - "crypto/md5" - "encoding/hex" - "encoding/json" - "fmt" - "log" - "math/rand" - "path" - "strconv" - "time" -) - -func NewController(i Interface, kv KV) (*Controller, error) { - if err := i.Init(); err != nil { - return nil, err - } - return &Controller{ - i: i, - kv: kv, - }, nil -} - -type Controller struct { - i Interface - //i *minioImpl - kv KV -} - -func (c *Controller) key(v string) string { - return "OBJECT_STORAGE:" + c.i.Name() + ":" + v -} - -func (c *Controller) putKey(v string) string { - return c.key("put:" + v) -} - -func (c *Controller) pathKey(v string) string { - return c.key("path:" + v) -} - -func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutAddr, error) { - if data, err := c.kv.Get(ctx, c.pathKey(args.Hash)); err == nil { - // 服务器已存在 - var src BucketFile - if err := json.Unmarshal([]byte(data), &src); err != nil { - return nil, err - } - var bucket string - if args.ClearTime <= 0 { - bucket = c.i.PermanentBucket() - } else { - bucket = c.i.ClearBucket() - } - dst := &BucketFile{ - Bucket: bucket, - Name: args.Name, - } - // 直接拷贝一份 - err := c.i.CopyObjectInfo(ctx, &src, dst) - if err == nil { - info, err := c.i.GetObjectInfo(ctx, dst) - if err != nil { - return nil, err - } - return &PutAddr{ - ResourceURL: info.URL, - }, nil - } else if !c.i.IsNotFound(err) { - return nil, err - } - } else if !c.kv.IsNotFound(err) { - return nil, err - } - // 上传逻辑 - name := args.Name - effective := time.Now().Add(args.EffectiveTime) - prefix := c.Prefix(&args.PutArgs) - if minSize := c.i.MinMultipartSize(); args.FragmentSize > 0 && args.FragmentSize < minSize { - args.FragmentSize = minSize - } - var pack int64 - if args.FragmentSize <= 0 || args.Size <= args.FragmentSize { - pack = 1 - } else { - pack = args.Size / args.FragmentSize - if args.Size%args.FragmentSize > 0 { - pack++ - } - } - p := path.Join(path.Dir(args.Name), time.Now().Format("20060102")) - info := putInfo{ - Bucket: c.i.UploadBucket(), - Fragments: make([]string, 0, pack), - FragmentSize: args.FragmentSize, - Name: name, - Hash: args.Hash, - Size: args.Size, - } - if args.ClearTime > 0 { - t := time.Now().Add(args.ClearTime).UnixMilli() - info.ClearTime = &t - } - putURLs := make([]string, 0, pack) - for i := int64(1); i <= pack; i++ { - name := prefix + "_" + strconv.FormatInt(i, 10) + path.Ext(args.Name) - name = path.Join(p, name) - info.Fragments = append(info.Fragments, name) - args.Name = name - put, err := c.i.ApplyPut(ctx, &ApplyPutArgs{ - Bucket: info.Bucket, - Name: name, - Effective: args.EffectiveTime, - Header: args.Header, - }) - if err != nil { - return nil, err - } - putURLs = append(putURLs, put.URL) - } - data, err := json.Marshal(&info) - if err != nil { - return nil, err - } - if err := c.kv.Set(ctx, c.putKey(prefix), string(data), args.EffectiveTime); err != nil { - return nil, err - } - var fragmentSize int64 - if pack == 1 { - fragmentSize = args.Size - } else { - fragmentSize = args.FragmentSize - } - return &PutAddr{ - PutURLs: putURLs, - FragmentSize: fragmentSize, - PutID: prefix, - EffectiveTime: effective, - }, nil -} - -func (c *Controller) ConfirmPut(ctx context.Context, putID string) (*ObjectInfo, error) { - data, err := c.kv.Get(ctx, c.putKey(putID)) - if err != nil { - return nil, err - } - var info putInfo - if err := json.Unmarshal([]byte(data), &info); err != nil { - return nil, err - } - var total int64 - src := make([]BucketFile, len(info.Fragments)) - for i, fragment := range info.Fragments { - state, err := c.i.GetObjectInfo(ctx, &BucketFile{ - Bucket: info.Bucket, - Name: fragment, - }) - if err != nil { - return nil, err - } - total += state.Size - src[i] = BucketFile{ - Bucket: info.Bucket, - Name: fragment, - } - } - if total != info.Size { - return nil, fmt.Errorf("incomplete upload %d/%d", total, info.Size) - } - var dst *BucketFile - if info.ClearTime == nil { - dst = &BucketFile{ - Bucket: c.i.PermanentBucket(), - Name: info.Name, - } - } else { - dst = &BucketFile{ - Bucket: c.i.ClearBucket(), - Name: info.Name, - } - } - if err := c.i.MergeObjectInfo(ctx, src, dst); err != nil { // SourceInfo 0 is too small (2) and it is not the last part - return nil, err - } - obj, err := c.i.GetObjectInfo(ctx, dst) - if err != nil { - return nil, err - } - go func() { - err := c.kv.Del(ctx, c.putKey(putID)) - if err != nil { - log.Println("del key:", err) - } - for _, b := range src { - err = c.i.DeleteObjectInfo(ctx, &b) - if err != nil { - log.Println("del obj:", err) - } - } - }() - return obj, nil -} - -func (c *Controller) Prefix(args *PutArgs) string { - buf := bytes.NewBuffer(nil) - buf.WriteString(args.Name) - buf.WriteString("~~~@~@~~~") - buf.WriteString(strconv.FormatInt(args.Size, 10)) - buf.WriteString(",") - buf.WriteString(args.Hash) - buf.WriteString(",") - buf.WriteString(strconv.FormatInt(int64(args.ClearTime), 10)) - buf.WriteString(",") - buf.WriteString(strconv.FormatInt(int64(args.EffectiveTime), 10)) - buf.WriteString(",") - buf.WriteString(c.i.Name()) - r := make([]byte, 16) - rand.Read(r) - buf.Write(r) - md5v := md5.Sum(buf.Bytes()) - return hex.EncodeToString(md5v[:]) -} - -type putInfo struct { - Bucket string - Fragments []string - FragmentSize int64 - Size int64 - Name string - Hash string - ClearTime *int64 -} diff --git a/internal/objstorage/kv.go b/internal/objstorage/kv.go deleted file mode 100644 index 75c4eafec..000000000 --- a/internal/objstorage/kv.go +++ /dev/null @@ -1,49 +0,0 @@ -package objstorage - -import ( - "context" - "github.com/go-redis/redis/v8" - "log" - "time" -) - -type KV interface { - Get(ctx context.Context, key string) (string, error) - Set(ctx context.Context, key string, val string, expiration time.Duration) error - Del(ctx context.Context, key string) error - IsNotFound(err error) bool -} - -func NewKV() KV { - rdb := redis.NewClient(&redis.Options{ - Addr: "", - Username: "", - Password: "", - }) - return &redisImpl{ - rdb: rdb, - } -} - -type redisImpl struct { - rdb *redis.Client -} - -func (r *redisImpl) Del(ctx context.Context, key string) error { - log.Println("redis del", key) - return r.rdb.Del(ctx, key).Err() -} - -func (r *redisImpl) Get(ctx context.Context, key string) (string, error) { - log.Println("redis get", key) - return r.rdb.Get(ctx, key).Result() -} - -func (r *redisImpl) Set(ctx context.Context, key string, val string, expiration time.Duration) error { - log.Println("redis set", key, val, expiration.String()) - return r.rdb.Set(ctx, key, val, expiration).Err() -} - -func (r *redisImpl) IsNotFound(err error) bool { - return err == redis.Nil -} diff --git a/internal/objstorage/main.go b/internal/objstorage/main.go deleted file mode 100644 index c6ba219e8..000000000 --- a/internal/objstorage/main.go +++ /dev/null @@ -1,112 +0,0 @@ -package objstorage - -import ( - "bytes" - "context" - "crypto/md5" - "encoding/hex" - "fmt" - "io" - "log" - "net/http" - "path" - "time" -) - -func HttpPut(url string, body io.Reader) error { - req, err := http.NewRequest(http.MethodPut, url, body) - if err != nil { - return err - } - client := http.Client{} - resp, err := client.Do(req) - if err != nil { - return err - } - data, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("http [%s] %s", resp.Status, data) - } - if len(data) > 0 { - log.Println("[http body]", string(data)) - } - return nil -} - -func Md5(p []byte) string { - t := md5.Sum(p) - return hex.EncodeToString(t[:]) -} - -func Main() { - ctx := context.Background() - c, err := NewController(&minioImpl{}, NewKV()) - if err != nil { - log.Fatalln(err) - } - - name := "hello.txt" - data := []byte("hello world") - - userID := "10000" - - name = path.Join("user_"+userID, name) - - addr, err := c.ApplyPut(ctx, &FragmentPutArgs{ - PutArgs: PutArgs{ - Name: name, - Size: int64(len(data)), - Hash: Md5(data), - EffectiveTime: time.Second * 60 * 60, - }, - FragmentSize: 2, - }) - if err != nil { - log.Fatalln(err) - } - fmt.Println() - fmt.Println() - - if addr.ResourceURL != "" { - log.Println("服务器已经存在") - return - } - var ( - start int - end = int(addr.FragmentSize) - ) - - for _, u := range addr.PutURLs { - if start >= len(data) { - break - } - if end > len(data) { - end = len(data) - } - _ = u - page := data[start:end] - fmt.Print(string(page)) - start += int(addr.FragmentSize) - end += int(addr.FragmentSize) - err = HttpPut(u, bytes.NewReader(page)) - if err != nil { - log.Fatalln(err) - } - } - fmt.Println() - fmt.Println() - - fmt.Println("[PUT_ID]", addr.PutID) - - info, err := c.ConfirmPut(ctx, addr.PutID) - if err != nil { - log.Fatalln(err) - } - - log.Printf("%+v\n", info) - - log.Println("success") -} diff --git a/internal/objstorage/minio.go b/internal/objstorage/minio.go deleted file mode 100644 index 065b54b6b..000000000 --- a/internal/objstorage/minio.go +++ /dev/null @@ -1,147 +0,0 @@ -package objstorage - -import ( - "context" - "errors" - "fmt" - "github.com/minio/minio-go" - "net/url" - "time" -) - -func NewMinio() Interface { - return &minioImpl{} -} - -type minioImpl struct { - uploadBucket string // 上传桶 - permanentBucket string // 永久桶 - clearBucket string // 自动清理桶 - client *minio.Client -} - -func (m *minioImpl) Init() error { - client, err := minio.New("127.0.0.1:9000", "minioadmin", "minioadmin", false) - if err != nil { - return fmt.Errorf("minio client error: %w", err) - } - m.client = client - m.uploadBucket = "upload" - m.permanentBucket = "permanent" - m.clearBucket = "clear" - return nil -} - -func (m *minioImpl) Name() string { - return "minio" -} - -func (m *minioImpl) MinMultipartSize() int64 { - return 1024 * 1024 * 5 // minio.absMinPartSize -} - -func (m *minioImpl) UploadBucket() string { - return m.uploadBucket -} - -func (m *minioImpl) PermanentBucket() string { - return m.permanentBucket -} - -func (m *minioImpl) ClearBucket() string { - return m.clearBucket -} - -func (m *minioImpl) urlReplace(u *url.URL) { - -} - -func (m *minioImpl) ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) { - if args.Effective <= 0 { - return nil, errors.New("EffectiveTime <= 0") - } - _, err := m.GetObjectInfo(ctx, &BucketFile{ - Bucket: m.uploadBucket, - Name: args.Name, - }) - if err == nil { - return nil, fmt.Errorf("minio bucket %s name %s already exists", args.Bucket, args.Name) - } else if !m.IsNotFound(err) { - return nil, err - } - effective := time.Now().Add(args.Effective) - u, err := m.client.PresignedPutObject(m.uploadBucket, args.Name, args.Effective) - if err != nil { - return nil, fmt.Errorf("minio apply error: %w", err) - } - m.urlReplace(u) - return &PutRes{ - URL: u.String(), - Bucket: m.uploadBucket, - Name: args.Name, - EffectiveTime: effective, - }, nil -} - -func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) { - info, err := m.client.StatObject(args.Bucket, args.Name, minio.StatObjectOptions{}) - if err != nil { - return nil, err - } - return &ObjectInfo{ - URL: "", // todo - Size: info.Size, - Hash: info.ETag, - }, nil -} - -func (m *minioImpl) CopyObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error { - destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil) - if err != nil { - return err - } - return m.client.CopyObject(destination, minio.NewSourceInfo(src.Bucket, src.Name, nil)) -} - -func (m *minioImpl) DeleteObjectInfo(ctx context.Context, info *BucketFile) error { - return m.client.RemoveObject(info.Bucket, info.Name) -} - -func (m *minioImpl) MoveObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error { - if err := m.CopyObjectInfo(ctx, src, dst); err != nil { - return err - } - return m.DeleteObjectInfo(ctx, src) -} - -func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error { - switch len(src) { - case 0: - return errors.New("src empty") - case 1: - return m.CopyObjectInfo(ctx, &src[0], dst) - } - destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil) - if err != nil { - return err - } - sources := make([]minio.SourceInfo, len(src)) - for i, s := range src { - sources[i] = minio.NewSourceInfo(s.Bucket, s.Name, nil) - } - return m.client.ComposeObject(destination, sources) // todo -} - -func (m *minioImpl) IsNotFound(err error) bool { - if err == nil { - return false - } - switch e := err.(type) { - case minio.ErrorResponse: - return e.StatusCode == 404 && e.Code == "NoSuchKey" - case *minio.ErrorResponse: - return e.StatusCode == 404 && e.Code == "NoSuchKey" - default: - return false - } -} diff --git a/internal/objstorage/oo.go b/internal/objstorage/oo.go deleted file mode 100644 index 7040bf5e0..000000000 --- a/internal/objstorage/oo.go +++ /dev/null @@ -1,19 +0,0 @@ -package objstorage - -import "context" - -type Interface interface { - Init() error - Name() string - MinMultipartSize() int64 - UploadBucket() string - PermanentBucket() string - ClearBucket() string - ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) - GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) - CopyObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error - DeleteObjectInfo(ctx context.Context, info *BucketFile) error - MoveObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error - MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error - IsNotFound(err error) bool -} diff --git a/internal/objstorage/pub.go b/internal/objstorage/pub.go deleted file mode 100644 index b98dfc94d..000000000 --- a/internal/objstorage/pub.go +++ /dev/null @@ -1,69 +0,0 @@ -package objstorage - -import ( - "net/http" - "time" -) - -type PutRes struct { - URL string - Bucket string - Name string - EffectiveTime time.Time -} - -type FragmentPutArgs struct { - PutArgs - FragmentSize int64 // 分片大小 -} - -type PutArgs struct { - Name string // 文件名 - Size int64 // 大小 - Hash string // md5 - Prefix string // 前缀 - ClearTime time.Duration // 自动清理时间 - EffectiveTime time.Duration // 申请有效时间 - Header http.Header // header -} - -type BucketFile struct { - Bucket string `json:"bucket"` - Name string `json:"name"` -} - -type ObjectInfo struct { - URL string - Size int64 - Hash string -} - -//type PutSpace struct { -// URL string -// EffectiveTime time.Time -//} - -type PutAddr struct { - ResourceURL string - PutID string - FragmentSize int64 - EffectiveTime time.Time - PutURLs []string -} - -type KVData struct { - Bucket string `json:"bucket"` - Name string `json:"name"` -} - -type PutResp struct { - URL string - Time *time.Time -} - -type ApplyPutArgs struct { - Bucket string - Name string - Effective time.Duration // 申请有效时间 - Header http.Header // header -} diff --git a/pkg/proto/msg/msg.proto b/pkg/proto/msg/msg.proto index 3dd526416..991112b7f 100644 --- a/pkg/proto/msg/msg.proto +++ b/pkg/proto/msg/msg.proto @@ -127,7 +127,6 @@ message SetMessageReactionExtensionsResp { repeated KeyValueResp result = 4; } - message GetMessagesReactionExtensionsReq { string sourceID = 1; int32 sessionType = 2; diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index f406c701f..d93553892 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -3,6 +3,7 @@ package rpcclient import ( "context" "encoding/json" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" @@ -13,12 +14,53 @@ import ( "github.com/golang/protobuf/proto" ) +func newContentTypeConf() map[int32]config.NotificationConf { + return map[int32]config.NotificationConf{ + // group + constant.GroupCreatedNotification: config.Config.Notification.GroupCreated, + constant.GroupInfoSetNotification: config.Config.Notification.GroupInfoSet, + constant.JoinGroupApplicationNotification: config.Config.Notification.JoinGroupApplication, + constant.MemberQuitNotification: config.Config.Notification.MemberQuit, + constant.GroupApplicationAcceptedNotification: config.Config.Notification.GroupApplicationAccepted, + constant.GroupApplicationRejectedNotification: config.Config.Notification.GroupApplicationRejected, + constant.GroupOwnerTransferredNotification: config.Config.Notification.GroupOwnerTransferred, + constant.MemberKickedNotification: config.Config.Notification.MemberKicked, + constant.MemberInvitedNotification: config.Config.Notification.MemberInvited, + constant.MemberEnterNotification: config.Config.Notification.MemberEnter, + constant.GroupDismissedNotification: config.Config.Notification.GroupDismissed, + constant.GroupMutedNotification: config.Config.Notification.GroupMuted, + constant.GroupCancelMutedNotification: config.Config.Notification.GroupCancelMuted, + constant.GroupMemberMutedNotification: config.Config.Notification.GroupMemberMuted, + constant.GroupMemberCancelMutedNotification: config.Config.Notification.GroupMemberCancelMuted, + constant.GroupMemberInfoSetNotification: config.Config.Notification.GroupMemberInfoSet, + constant.GroupMemberSetToAdminNotification: config.Config.Notification.GroupMemberSetToAdmin, + constant.GroupMemberSetToOrdinaryUserNotification: config.Config.Notification.GroupMemberSetToOrdinary, + // user + constant.UserInfoUpdatedNotification: config.Config.Notification.UserInfoUpdated, + // friend + constant.FriendApplicationNotification: config.Config.Notification.FriendApplication, + constant.FriendApplicationApprovedNotification: config.Config.Notification.FriendApplicationApproved, + constant.FriendApplicationRejectedNotification: config.Config.Notification.FriendApplicationRejected, + constant.FriendAddedNotification: config.Config.Notification.FriendAdded, + constant.FriendDeletedNotification: config.Config.Notification.FriendDeleted, + constant.FriendRemarkSetNotification: config.Config.Notification.FriendRemarkSet, + constant.BlackAddedNotification: config.Config.Notification.BlackAdded, + constant.BlackDeletedNotification: config.Config.Notification.BlackDeleted, + constant.FriendInfoUpdatedNotification: config.Config.Notification.FriendInfoUpdated, + // conversation + constant.ConversationChangeNotification: config.Config.Notification.ConversationChanged, + constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged, + constant.ConversationPrivateChatNotification: config.Config.Notification.ConversationSetPrivate, + } +} + type MsgClient struct { *MetaClient + contentTypeConf map[int32]config.NotificationConf } func NewMsgClient(zk discoveryregistry.SvcDiscoveryRegistry) *MsgClient { - return &MsgClient{NewMetaClient(zk, config.Config.RpcRegisterName.OpenImMsgName)} + return &MsgClient{MetaClient: NewMetaClient(zk, config.Config.RpcRegisterName.OpenImMsgName), contentTypeConf: newContentTypeConf()} } func (m *MsgClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { @@ -48,9 +90,10 @@ func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMes return resp, err } -func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, cfg config.NotificationConf, opts ...utils.OptionsOpt) error { +func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...utils.OptionsOpt) error { content, err := json.Marshal(m) if err != nil { + log.ZError(ctx, "MsgClient Notification json.Marshal failed", err) return err } var req msg.SendMsgReq @@ -68,7 +111,7 @@ func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, con // msg.Options = make(map[string]bool, 7) // todo notification get sender name and face url // msg.SenderNickname, msg.SenderFaceURL, err = c.getFaceURLAndName(sendID) - options := config.GetOptionsByNotification(cfg) + options := config.GetOptionsByNotification(c.contentTypeConf[contentType]) options = utils.WithOptions(options, opts...) msg.Options = options offlineInfo.Title = title @@ -77,5 +120,10 @@ func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, con msg.OfflinePushInfo = &offlineInfo req.MsgData = &msg _, err = c.SendMsg(ctx, &req) + if err == nil { + log.ZDebug(ctx, "MsgClient Notification SendMsg success", "req", &req) + } else { + log.ZError(ctx, "MsgClient Notification SendMsg failed %s\n", err, "req", &req) + } return err } diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 44f44ee71..2116c4af6 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -2,8 +2,8 @@ package notification import ( "context" - "encoding/json" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" @@ -218,19 +218,21 @@ func (g *GroupNotificationSender) getFromToUserNickname(ctx context.Context, fro } func (g *GroupNotificationSender) groupNotification(ctx context.Context, contentType int32, m proto.Message, sendID, groupID, recvUserID string) (err error) { - content, err := json.Marshal(m) - if err != nil { - return err - } - notificationMsg := rpcclient.NotificationMsg{ - SendID: sendID, - RecvID: recvUserID, - ContentType: contentType, - SessionType: constant.GroupChatType, - MsgFrom: constant.SysMsgType, - Content: content, - } - return g.msgClient.Notification(ctx, ¬ificationMsg) + //content, err := json.Marshal(m) + //if err != nil { + // return err + //} + //notificationMsg := rpcclient.NotificationMsg{ + // SendID: sendID, + // RecvID: recvUserID, + // ContentType: contentType, + // SessionType: constant.GroupChatType, + // MsgFrom: constant.SysMsgType, + // Content: content, + //} + //return g.msgClient.Notification(ctx, ¬ificationMsg) + + return err } func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, group *relation.GroupModel, members []*relation.GroupMemberModel, userMap map[string]*sdkws.UserInfo) (err error) { @@ -317,12 +319,6 @@ func (g *GroupNotificationSender) mergeGroupFull(ctx context.Context, groupID st } func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, group *relation.GroupModel, members []*relation.GroupMemberModel, needVerification *int32) (err error) { - defer log.ZDebug(ctx, "return") - defer func() { - if err != nil { - log.ZError(ctx, utils.GetFuncName(1)+" failed", err) - } - }() groupInfo, err := g.mergeGroupFull(ctx, group.GroupID, group, &members, nil) if err != nil { return err @@ -331,9 +327,11 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, if needVerification != nil { groupInfoChangedTips.Group.NeedVerification = *needVerification } - return g.groupNotification(ctx, constant.GroupInfoSetNotification, groupInfoChangedTips, mcontext.GetOpUserID(ctx), group.GroupID, "") + return g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupInfoSetNotification, constant.SuperGroupChatType, groupInfoChangedTips) } +// #################################### + func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbGroup.JoinGroupReq) (err error) { defer log.ZDebug(ctx, "return") defer func() { @@ -356,6 +354,7 @@ func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.C joinGroupApplicationTips := &sdkws.JoinGroupApplicationTips{Group: group, Applicant: user, ReqMsg: req.ReqMessage} for _, userID := range userIDs { err := g.groupNotification(ctx, constant.JoinGroupApplicationNotification, joinGroupApplicationTips, mcontext.GetOpUserID(ctx), "", userID) + err = g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.JoinGroupApplicationNotification, constant.SuperGroupChatType, joinGroupApplicationTips, config.Config.Notification.JoinGroupApplication) if err != nil { log.ZError(ctx, "JoinGroupApplicationNotification failed", err, "group", req.GroupID, "userID", userID) }