From 4683712e0532f95e1e5a3b10c55f2762193155da Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 16 Feb 2023 16:19:53 +0800 Subject: [PATCH] object storage --- internal/objstorage/controller.go | 232 ++++++++++++++++++++++++ internal/objstorage/kv.go | 49 +++++ internal/objstorage/main.go | 112 ++++++++++++ internal/objstorage/minio.go | 143 +++++++++++++++ internal/objstorage/oo.go | 18 ++ internal/objstorage/pub.go | 69 +++++++ internal/rpc/msg/delete.go | 20 +- internal/rpc/msg/extend_msg_callback.go | 91 +++++----- pkg/callbackstruct/message.go | 23 +++ pkg/proto/file/file.proto | 37 ++++ pkg/proto/proto_dir.cfg | 1 + 11 files changed, 742 insertions(+), 53 deletions(-) create mode 100644 internal/objstorage/controller.go create mode 100644 internal/objstorage/kv.go create mode 100644 internal/objstorage/main.go create mode 100644 internal/objstorage/minio.go create mode 100644 internal/objstorage/oo.go create mode 100644 internal/objstorage/pub.go create mode 100644 pkg/proto/file/file.proto diff --git a/internal/objstorage/controller.go b/internal/objstorage/controller.go new file mode 100644 index 000000000..621e79aa6 --- /dev/null +++ b/internal/objstorage/controller.go @@ -0,0 +1,232 @@ +package objstorage + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "log" + "math/rand" + "path" + "strconv" + "time" +) + +func NewController(i Interface, kv KV) (*Controller, error) { + if err := i.Init(); err != nil { + return nil, err + } + return &Controller{ + i: i, + kv: kv, + }, nil +} + +type Controller struct { + i Interface + //i *minioImpl + kv KV +} + +func (c *Controller) key(v string) string { + return "OBJECT_STORAGE:" + c.i.Name() + ":" + v +} + +func (c *Controller) putKey(v string) string { + return c.key("put:" + v) +} + +func (c *Controller) pathKey(v string) string { + return c.key("path:" + v) +} + +func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutAddr, error) { + if data, err := c.kv.Get(ctx, c.pathKey(args.Hash)); err == nil { + // 服务器已存在 + var src BucketFile + if err := json.Unmarshal([]byte(data), &src); err != nil { + return nil, err + } + var bucket string + if args.ClearTime <= 0 { + bucket = c.i.PermanentBucket() + } else { + bucket = c.i.ClearBucket() + } + dst := &BucketFile{ + Bucket: bucket, + Name: args.Name, + } + // 直接拷贝一份 + err := c.i.CopyObjetInfo(ctx, &src, dst) + if err == nil { + info, err := c.i.GetObjectInfo(ctx, dst) + if err != nil { + return nil, err + } + return &PutAddr{ + ResourceURL: info.URL, + }, nil + } else if !c.i.IsNotFound(err) { + return nil, err + } + } else if !c.kv.IsNotFound(err) { + return nil, err + } + // 上传逻辑 + name := args.Name + effective := time.Now().Add(args.EffectiveTime) + prefix := c.Prefix(&args.PutArgs) + var pack int64 + if args.FragmentSize <= 0 || args.Size <= args.FragmentSize { + pack = 1 + } else { + pack = args.Size / args.FragmentSize + if args.Size%args.FragmentSize > 0 { + pack++ + } + } + p := path.Join(path.Dir(args.Name), time.Now().Format("20060102")) + info := putInfo{ + Bucket: c.i.UploadBucket(), + Fragments: make([]string, 0, pack), + FragmentSize: args.FragmentSize, + Name: name, + Hash: args.Hash, + Size: args.Size, + } + if args.ClearTime > 0 { + t := time.Now().Add(args.ClearTime).UnixMilli() + info.ClearTime = &t + } + putURLs := make([]string, 0, pack) + for i := int64(1); i <= pack; i++ { + name := prefix + "_" + strconv.FormatInt(i, 10) + path.Ext(args.Name) + name = path.Join(p, name) + info.Fragments = append(info.Fragments, name) + args.Name = name + put, err := c.i.ApplyPut(ctx, &ApplyPutArgs{ + Bucket: info.Bucket, + Name: name, + Effective: args.EffectiveTime, + Header: args.Header, + }) + if err != nil { + return nil, err + } + putURLs = append(putURLs, put.URL) + } + data, err := json.Marshal(&info) + if err != nil { + return nil, err + } + if err := c.kv.Set(ctx, c.putKey(prefix), string(data), args.EffectiveTime); err != nil { + return nil, err + } + var fragmentSize int64 + if pack == 1 { + fragmentSize = args.Size + } else { + fragmentSize = args.FragmentSize + } + return &PutAddr{ + PutURLs: putURLs, + FragmentSize: fragmentSize, + PutID: prefix, + EffectiveTime: effective, + }, nil +} + +func (c *Controller) ConfirmPut(ctx context.Context, putID string) (*ObjectInfo, error) { + data, err := c.kv.Get(ctx, c.putKey(putID)) + if err != nil { + return nil, err + } + var info putInfo + if err := json.Unmarshal([]byte(data), &info); err != nil { + return nil, err + } + var total int64 + src := make([]BucketFile, len(info.Fragments)) + for i, fragment := range info.Fragments { + state, err := c.i.GetObjectInfo(ctx, &BucketFile{ + Bucket: info.Bucket, + Name: fragment, + }) + if err != nil { + return nil, err + } + total += state.Size + src[i] = BucketFile{ + Bucket: info.Bucket, + Name: fragment, + } + } + if total != info.Size { + return nil, fmt.Errorf("incomplete upload %d/%d", total, info.Size) + } + var dst *BucketFile + if info.ClearTime == nil { + dst = &BucketFile{ + Bucket: c.i.PermanentBucket(), + Name: info.Name, + } + } else { + dst = &BucketFile{ + Bucket: c.i.ClearBucket(), + Name: info.Name, + } + } + if err := c.i.MergeObjectInfo(ctx, src, dst); err != nil { // SourceInfo 0 is too small (2) and it is not the last part + return nil, err + } + obj, err := c.i.GetObjectInfo(ctx, dst) + if err != nil { + return nil, err + } + go func() { + err := c.kv.Del(ctx, c.putKey(putID)) + if err != nil { + log.Println("del key:", err) + } + for _, b := range src { + err = c.i.DeleteObjetInfo(ctx, &b) + if err != nil { + log.Println("del obj:", err) + } + } + }() + return obj, nil +} + +func (c *Controller) Prefix(args *PutArgs) string { + buf := bytes.NewBuffer(nil) + buf.WriteString(args.Name) + buf.WriteString("~~~@~@~~~") + buf.WriteString(strconv.FormatInt(args.Size, 10)) + buf.WriteString(",") + buf.WriteString(args.Hash) + buf.WriteString(",") + buf.WriteString(strconv.FormatInt(int64(args.ClearTime), 10)) + buf.WriteString(",") + buf.WriteString(strconv.FormatInt(int64(args.EffectiveTime), 10)) + buf.WriteString(",") + buf.WriteString(c.i.Name()) + r := make([]byte, 16) + rand.Read(r) + buf.Write(r) + md5v := md5.Sum(buf.Bytes()) + return hex.EncodeToString(md5v[:]) +} + +type putInfo struct { + Bucket string + Fragments []string + FragmentSize int64 + Size int64 + Name string + Hash string + ClearTime *int64 +} diff --git a/internal/objstorage/kv.go b/internal/objstorage/kv.go new file mode 100644 index 000000000..75c4eafec --- /dev/null +++ b/internal/objstorage/kv.go @@ -0,0 +1,49 @@ +package objstorage + +import ( + "context" + "github.com/go-redis/redis/v8" + "log" + "time" +) + +type KV interface { + Get(ctx context.Context, key string) (string, error) + Set(ctx context.Context, key string, val string, expiration time.Duration) error + Del(ctx context.Context, key string) error + IsNotFound(err error) bool +} + +func NewKV() KV { + rdb := redis.NewClient(&redis.Options{ + Addr: "", + Username: "", + Password: "", + }) + return &redisImpl{ + rdb: rdb, + } +} + +type redisImpl struct { + rdb *redis.Client +} + +func (r *redisImpl) Del(ctx context.Context, key string) error { + log.Println("redis del", key) + return r.rdb.Del(ctx, key).Err() +} + +func (r *redisImpl) Get(ctx context.Context, key string) (string, error) { + log.Println("redis get", key) + return r.rdb.Get(ctx, key).Result() +} + +func (r *redisImpl) Set(ctx context.Context, key string, val string, expiration time.Duration) error { + log.Println("redis set", key, val, expiration.String()) + return r.rdb.Set(ctx, key, val, expiration).Err() +} + +func (r *redisImpl) IsNotFound(err error) bool { + return err == redis.Nil +} diff --git a/internal/objstorage/main.go b/internal/objstorage/main.go new file mode 100644 index 000000000..c6ba219e8 --- /dev/null +++ b/internal/objstorage/main.go @@ -0,0 +1,112 @@ +package objstorage + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/hex" + "fmt" + "io" + "log" + "net/http" + "path" + "time" +) + +func HttpPut(url string, body io.Reader) error { + req, err := http.NewRequest(http.MethodPut, url, body) + if err != nil { + return err + } + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + data, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http [%s] %s", resp.Status, data) + } + if len(data) > 0 { + log.Println("[http body]", string(data)) + } + return nil +} + +func Md5(p []byte) string { + t := md5.Sum(p) + return hex.EncodeToString(t[:]) +} + +func Main() { + ctx := context.Background() + c, err := NewController(&minioImpl{}, NewKV()) + if err != nil { + log.Fatalln(err) + } + + name := "hello.txt" + data := []byte("hello world") + + userID := "10000" + + name = path.Join("user_"+userID, name) + + addr, err := c.ApplyPut(ctx, &FragmentPutArgs{ + PutArgs: PutArgs{ + Name: name, + Size: int64(len(data)), + Hash: Md5(data), + EffectiveTime: time.Second * 60 * 60, + }, + FragmentSize: 2, + }) + if err != nil { + log.Fatalln(err) + } + fmt.Println() + fmt.Println() + + if addr.ResourceURL != "" { + log.Println("服务器已经存在") + return + } + var ( + start int + end = int(addr.FragmentSize) + ) + + for _, u := range addr.PutURLs { + if start >= len(data) { + break + } + if end > len(data) { + end = len(data) + } + _ = u + page := data[start:end] + fmt.Print(string(page)) + start += int(addr.FragmentSize) + end += int(addr.FragmentSize) + err = HttpPut(u, bytes.NewReader(page)) + if err != nil { + log.Fatalln(err) + } + } + fmt.Println() + fmt.Println() + + fmt.Println("[PUT_ID]", addr.PutID) + + info, err := c.ConfirmPut(ctx, addr.PutID) + if err != nil { + log.Fatalln(err) + } + + log.Printf("%+v\n", info) + + log.Println("success") +} diff --git a/internal/objstorage/minio.go b/internal/objstorage/minio.go new file mode 100644 index 000000000..3400c8f8a --- /dev/null +++ b/internal/objstorage/minio.go @@ -0,0 +1,143 @@ +package objstorage + +import ( + "context" + "errors" + "fmt" + "github.com/minio/minio-go" + "net/url" + "time" +) + +//func NewMinio() Interface { +// return &minioImpl{} +//} + +type minioImpl struct { + uploadBucket string // 上传桶 + permanentBucket string // 永久桶 + clearBucket string // 自动清理桶 + client *minio.Client +} + +func (m *minioImpl) Init() error { + client, err := minio.New("127.0.0.1:9000", "minioadmin", "minioadmin", false) + if err != nil { + return fmt.Errorf("minio client error: %w", err) + } + m.client = client + m.uploadBucket = "upload" + m.permanentBucket = "permanent" + m.clearBucket = "clear" + return nil +} + +func (m *minioImpl) Name() string { + return "minio" +} + +func (m *minioImpl) UploadBucket() string { + return m.uploadBucket +} + +func (m *minioImpl) PermanentBucket() string { + return m.permanentBucket +} + +func (m *minioImpl) ClearBucket() string { + return m.clearBucket +} + +func (m *minioImpl) urlReplace(u *url.URL) { + +} + +func (m *minioImpl) ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) { + if args.Effective <= 0 { + return nil, errors.New("EffectiveTime <= 0") + } + _, err := m.GetObjectInfo(ctx, &BucketFile{ + Bucket: m.uploadBucket, + Name: args.Name, + }) + if err == nil { + return nil, fmt.Errorf("minio bucket %s name %s already exists", args.Bucket, args.Name) + } else if !m.IsNotFound(err) { + return nil, err + } + effective := time.Now().Add(args.Effective) + u, err := m.client.PresignedPutObject(m.uploadBucket, args.Name, args.Effective) + if err != nil { + return nil, fmt.Errorf("minio apply error: %w", err) + } + m.urlReplace(u) + return &PutRes{ + URL: u.String(), + Bucket: m.uploadBucket, + Name: args.Name, + EffectiveTime: effective, + }, nil +} + +func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) { + info, err := m.client.StatObject(args.Bucket, args.Name, minio.StatObjectOptions{}) + if err != nil { + return nil, err + } + return &ObjectInfo{ + URL: "", // todo + Size: info.Size, + Hash: info.ETag, + }, nil +} + +func (m *minioImpl) CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error { + destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil) + if err != nil { + return err + } + return m.client.CopyObject(destination, minio.NewSourceInfo(src.Bucket, src.Name, nil)) +} + +func (m *minioImpl) DeleteObjetInfo(ctx context.Context, info *BucketFile) error { + return m.client.RemoveObject(info.Bucket, info.Name) +} + +func (m *minioImpl) MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error { + if err := m.CopyObjetInfo(ctx, src, dst); err != nil { + return err + } + return m.DeleteObjetInfo(ctx, src) +} + +func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error { + switch len(src) { + case 0: + return errors.New("src empty") + case 1: + return m.CopyObjetInfo(ctx, &src[0], dst) + } + destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil) + if err != nil { + return err + } + sources := make([]minio.SourceInfo, len(src)) + for i, s := range src { + sources[i] = minio.NewSourceInfo(s.Bucket, s.Name, nil) + } + return m.client.ComposeObject(destination, sources) // todo +} + +func (m *minioImpl) IsNotFound(err error) bool { + if err == nil { + return false + } + switch e := err.(type) { + case minio.ErrorResponse: + return e.StatusCode == 404 && e.Code == "NoSuchKey" + case *minio.ErrorResponse: + return e.StatusCode == 404 && e.Code == "NoSuchKey" + default: + return false + } +} diff --git a/internal/objstorage/oo.go b/internal/objstorage/oo.go new file mode 100644 index 000000000..33a0eb5b5 --- /dev/null +++ b/internal/objstorage/oo.go @@ -0,0 +1,18 @@ +package objstorage + +import "context" + +type Interface interface { + Init() error + Name() string + UploadBucket() string + PermanentBucket() string + ClearBucket() string + ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) + GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) + CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error + DeleteObjetInfo(ctx context.Context, info *BucketFile) error + MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error + MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error + IsNotFound(err error) bool +} diff --git a/internal/objstorage/pub.go b/internal/objstorage/pub.go new file mode 100644 index 000000000..b98dfc94d --- /dev/null +++ b/internal/objstorage/pub.go @@ -0,0 +1,69 @@ +package objstorage + +import ( + "net/http" + "time" +) + +type PutRes struct { + URL string + Bucket string + Name string + EffectiveTime time.Time +} + +type FragmentPutArgs struct { + PutArgs + FragmentSize int64 // 分片大小 +} + +type PutArgs struct { + Name string // 文件名 + Size int64 // 大小 + Hash string // md5 + Prefix string // 前缀 + ClearTime time.Duration // 自动清理时间 + EffectiveTime time.Duration // 申请有效时间 + Header http.Header // header +} + +type BucketFile struct { + Bucket string `json:"bucket"` + Name string `json:"name"` +} + +type ObjectInfo struct { + URL string + Size int64 + Hash string +} + +//type PutSpace struct { +// URL string +// EffectiveTime time.Time +//} + +type PutAddr struct { + ResourceURL string + PutID string + FragmentSize int64 + EffectiveTime time.Time + PutURLs []string +} + +type KVData struct { + Bucket string `json:"bucket"` + Name string `json:"name"` +} + +type PutResp struct { + URL string + Time *time.Time +} + +type ApplyPutArgs struct { + Bucket string + Name string + Effective time.Duration // 申请有效时间 + Header http.Header // header +} diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index d2e5e7b84..5695912a0 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -9,7 +9,7 @@ import ( func (m *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) { resp := &common.DelMsgListResp{} - if err := m.MsgInterface.DelMsgFromCache(ctx, req.UserID, req.SeqList); err != nil { + if _, err := m.MsgInterface.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil { return nil, err } DeleteMessageNotification(ctx, req.UserID, req.SeqList) @@ -21,11 +21,14 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err } - maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID) - if err != nil { - return nil, err - } - if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { + //maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID) + //if err != nil { + // return nil, err + //} + //if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { + // return nil, err + //} + if err := m.MsgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, req.UserID, 0); err != nil { return nil, err } return resp, nil @@ -36,8 +39,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { return nil, err } - if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil { + if err := m.MsgInterface.CleanUpUserMsg(ctx, req.UserID); err != nil { return nil, err } + //if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil { + // return nil, err + //} return resp, nil } diff --git a/internal/rpc/msg/extend_msg_callback.go b/internal/rpc/msg/extend_msg_callback.go index f9876ee1d..da00356b4 100644 --- a/internal/rpc/msg/extend_msg_callback.go +++ b/internal/rpc/msg/extend_msg_callback.go @@ -48,52 +48,51 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti MsgFirstModifyTime: setReq.MsgFirstModifyTime, } resp := &cbapi.CallbackDeleteMessageReactionExtResp{} - defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp) return http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) } -//func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error { -// if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { -// return nil -// } -// req := cbapi.CallbackGetMessageListReactionExtReq{ -// OperationID: getReq.OperationID, -// CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand, -// SourceID: getReq.SourceID, -// OpUserID: getReq.OpUserID, -// SessionType: getReq.SessionType, -// TypeKeyList: getReq.TypeKeyList, -// MessageKeyList: getReq.MessageReactionKeyList, -// } -// resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp} -// defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp) -// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { -// callbackResp.ErrCode = http2.StatusInternalServerError -// callbackResp.ErrMsg = err.Error() -// } -// return resp -//} -//func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp { -// callbackResp := cbapi.CommonCallbackResp{OperationID: setReq.OperationID} -// log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) -// req := cbapi.CallbackAddMessageReactionExtReq{ -// OperationID: setReq.OperationID, -// CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand, -// SourceID: setReq.SourceID, -// OpUserID: setReq.OpUserID, -// SessionType: setReq.SessionType, -// ReactionExtensionList: setReq.ReactionExtensionList, -// ClientMsgID: setReq.ClientMsgID, -// IsReact: setReq.IsReact, -// IsExternalExtensions: setReq.IsExternalExtensions, -// MsgFirstModifyTime: setReq.MsgFirstModifyTime, -// } -// resp := &cbapi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} -// defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime) -// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { -// callbackResp.ErrCode = http2.StatusInternalServerError -// callbackResp.ErrMsg = err.Error() -// } -// return resp -// -//} +func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error { + if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { + return nil + } + req := cbapi.CallbackGetMessageListReactionExtReq{ + OperationID: getReq.OperationID, + CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand, + SourceID: getReq.SourceID, + OpUserID: getReq.OpUserID, + SessionType: getReq.SessionType, + TypeKeyList: getReq.TypeKeyList, + MessageKeyList: getReq.MessageReactionKeyList, + } + resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp} + defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp) + if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + } + return resp +} + +func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp { + callbackResp := cbapi.CommonCallbackResp{} + req := cbapi.CallbackAddMessageReactionExtReq{ + OperationID: setReq.OperationID, + CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand, + SourceID: setReq.SourceID, + OpUserID: setReq.OpUserID, + SessionType: setReq.SessionType, + ReactionExtensionList: setReq.ReactionExtensionList, + ClientMsgID: setReq.ClientMsgID, + IsReact: setReq.IsReact, + IsExternalExtensions: setReq.IsExternalExtensions, + MsgFirstModifyTime: setReq.MsgFirstModifyTime, + } + resp := &cbapi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} + defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime) + if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + } + return resp + +} diff --git a/pkg/callbackstruct/message.go b/pkg/callbackstruct/message.go index ce8d72629..d28b50135 100644 --- a/pkg/callbackstruct/message.go +++ b/pkg/callbackstruct/message.go @@ -99,3 +99,26 @@ type CallbackDeleteMessageReactionExtResp struct { ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"` MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } + +type CallbackGetMessageListReactionExtReq struct { + OperationID string `json:"operationID"` + CallbackCommand string `json:"callbackCommand"` + SourceID string `json:"sourceID"` + OpUserID string `json:"opUserID"` + SessionType int32 `json:"sessionType"` + TypeKeyList []string `json:"typeKeyList"` + MessageKeyList []*msg.GetMessageListReactionExtensionsReq_MessageReactionKey `json:"messageKeyList"` +} + +type CallbackAddMessageReactionExtReq struct { + OperationID string `json:"operationID"` + CallbackCommand string `json:"callbackCommand"` + SourceID string `json:"sourceID"` + OpUserID string `json:"opUserID"` + SessionType int32 `json:"sessionType"` + ReactionExtensionList map[string]*sdkws.KeyValue `json:"reactionExtensionList"` + ClientMsgID string `json:"clientMsgID"` + IsReact bool `json:"isReact"` + IsExternalExtensions bool `json:"isExternalExtensions"` + MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` +} diff --git a/pkg/proto/file/file.proto b/pkg/proto/file/file.proto new file mode 100644 index 000000000..5c1ab246b --- /dev/null +++ b/pkg/proto/file/file.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; +option go_package = "Open_IM/pkg/proto/file;file"; +package file; + + +message ApplySpaceReq { + string name = 1; // 文件名字 + int64 size = 2; // 大小 + string hash = 3; // md5 + uint32 purpose = 4; // 用途 + string contentType = 5; +} + +message ApplySpaceResp { + string url = 1; // 不为空表示已存在 + int64 size = 2; // 分片大小 + repeated string put = 3;// put地址 + string confirmID = 4; // 确认ID +} + +message ConfirmSpaceReq { + string confirmID = 1; // 确认ID +} + +message ConfirmSpaceResp { + string confirmID = 1; +} + +service file { + rpc ApplySpaceReq(ApplySpaceReq) returns(ApplySpaceResp); +} + + + + + + diff --git a/pkg/proto/proto_dir.cfg b/pkg/proto/proto_dir.cfg index a85a69155..7509903fe 100644 --- a/pkg/proto/proto_dir.cfg +++ b/pkg/proto/proto_dir.cfg @@ -10,4 +10,5 @@ all_proto=( relay/relay.proto sdkws/ws.proto conversation/conversation.proto + file/file.proto )