mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-08 04:06:31 +08:00
Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode
This commit is contained in:
commit
2ab6136b2f
@ -96,7 +96,7 @@ credential: #腾讯cos,发送图片、视频、文件时需要,请自行申
|
|||||||
|
|
||||||
object:
|
object:
|
||||||
enable: minio
|
enable: minio
|
||||||
apiURL: http://127.0.0.1:10002/third/object?name=
|
apiURL: http://127.0.0.1:10002/third/object
|
||||||
minio: #MinIO 发送图片、视频、文件时需要,请自行申请后替换,必须修改。 客户端初始化InitSDK,中 object_storage参数为minio
|
minio: #MinIO 发送图片、视频、文件时需要,请自行申请后替换,必须修改。 客户端初始化InitSDK,中 object_storage参数为minio
|
||||||
tempBucket: "openim"
|
tempBucket: "openim"
|
||||||
dataBucket: "openim"
|
dataBucket: "openim"
|
||||||
|
@ -2,6 +2,7 @@ package third
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/obj"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/obj"
|
||||||
@ -11,9 +12,14 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||||
|
u, err := url.Parse(config.Config.Object.ApiURL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
rdb, err := cache.NewRedis()
|
rdb, err := cache.NewRedis()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -32,7 +38,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
|||||||
third.RegisterThirdServer(server, &thirdServer{
|
third.RegisterThirdServer(server, &thirdServer{
|
||||||
thirdDatabase: controller.NewThirdDatabase(cache.NewCacheModel(rdb)),
|
thirdDatabase: controller.NewThirdDatabase(cache.NewCacheModel(rdb)),
|
||||||
userCheck: check.NewUserCheck(client),
|
userCheck: check.NewUserCheck(client),
|
||||||
s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db)),
|
s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db), u),
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/obj"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/obj"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
@ -18,6 +17,7 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"io"
|
"io"
|
||||||
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
@ -39,8 +39,9 @@ type S3Database interface {
|
|||||||
CleanExpirationObject(ctx context.Context, t time.Time)
|
CleanExpirationObject(ctx context.Context, t time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS3Database(obj obj.Interface, hash relation.ObjectHashModelInterface, info relation.ObjectInfoModelInterface, put relation.ObjectPutModelInterface) S3Database {
|
func NewS3Database(obj obj.Interface, hash relation.ObjectHashModelInterface, info relation.ObjectInfoModelInterface, put relation.ObjectPutModelInterface, url *url.URL) S3Database {
|
||||||
return &s3Database{
|
return &s3Database{
|
||||||
|
url: url,
|
||||||
obj: obj,
|
obj: obj,
|
||||||
hash: hash,
|
hash: hash,
|
||||||
info: info,
|
info: info,
|
||||||
@ -49,6 +50,7 @@ func NewS3Database(obj obj.Interface, hash relation.ObjectHashModelInterface, in
|
|||||||
}
|
}
|
||||||
|
|
||||||
type s3Database struct {
|
type s3Database struct {
|
||||||
|
url *url.URL
|
||||||
obj obj.Interface
|
obj obj.Interface
|
||||||
hash relation.ObjectHashModelInterface
|
hash relation.ObjectHashModelInterface
|
||||||
info relation.ObjectInfoModelInterface
|
info relation.ObjectInfoModelInterface
|
||||||
@ -96,7 +98,23 @@ func (c *s3Database) CheckHash(hash string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *s3Database) urlName(name string) string {
|
func (c *s3Database) urlName(name string) string {
|
||||||
return config.Config.Object.ApiURL + name
|
u := url.URL{
|
||||||
|
Scheme: c.url.Scheme,
|
||||||
|
Opaque: c.url.Opaque,
|
||||||
|
User: c.url.User,
|
||||||
|
Host: c.url.Host,
|
||||||
|
Path: c.url.Path,
|
||||||
|
RawPath: c.url.RawPath,
|
||||||
|
OmitHost: c.url.OmitHost,
|
||||||
|
ForceQuery: c.url.ForceQuery,
|
||||||
|
RawQuery: c.url.RawQuery,
|
||||||
|
Fragment: c.url.Fragment,
|
||||||
|
RawFragment: c.url.RawFragment,
|
||||||
|
}
|
||||||
|
v := make(url.Values, 1)
|
||||||
|
v.Set("name", name)
|
||||||
|
u.RawQuery = v.Encode()
|
||||||
|
return u.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *s3Database) UUID() string {
|
func (c *s3Database) UUID() string {
|
||||||
@ -157,8 +175,15 @@ func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*thi
|
|||||||
if put.PutID == "" {
|
if put.PutID == "" {
|
||||||
put.PutID = c.UUID()
|
put.PutID = c.UUID()
|
||||||
}
|
}
|
||||||
if _, err := c.put.Take(ctx, put.PutID); err == nil {
|
if v, err := c.put.Take(ctx, put.PutID); err == nil {
|
||||||
return nil, errs.ErrDuplicateKey.Wrap(fmt.Sprintf("duplicate put id %s", put.PutID))
|
now := time.Now().UnixMilli()
|
||||||
|
if v.EffectiveTime.UnixMilli() <= now {
|
||||||
|
if err := c.put.DelPut(ctx, []string{v.PutID}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, errs.ErrDuplicateKey.Wrap(fmt.Sprintf("duplicate put id %s", put.PutID))
|
||||||
|
}
|
||||||
} else if !c.isNotFound(err) {
|
} else if !c.isNotFound(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -203,9 +228,6 @@ func (c *s3Database) GetPut(ctx context.Context, req *third.GetPutReq) (*third.G
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if up.Complete {
|
|
||||||
return nil, errors.New("up completed")
|
|
||||||
}
|
|
||||||
reader, err := c.obj.GetObject(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: path.Join(up.Path, urlsName)})
|
reader, err := c.obj.GetObject(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: path.Join(up.Path, urlsName)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -273,9 +295,6 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if put.Complete {
|
|
||||||
return nil, errs.ErrFileUploadedComplete.Wrap("put complete")
|
|
||||||
}
|
|
||||||
now := time.Now().UnixMilli()
|
now := time.Now().UnixMilli()
|
||||||
if put.EffectiveTime.UnixMilli() < now {
|
if put.EffectiveTime.UnixMilli() < now {
|
||||||
return nil, errs.ErrFileUploadedExpired.Wrap("put expired")
|
return nil, errs.ErrFileUploadedExpired.Wrap("put expired")
|
||||||
@ -403,8 +422,8 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
|||||||
if err := c.info.SetObject(ctx, o); err != nil {
|
if err := c.info.SetObject(ctx, o); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := c.put.SetCompleted(ctx, put.PutID); err != nil {
|
if err := c.put.DelPut(ctx, []string{put.PutID}); err != nil {
|
||||||
log.ZError(ctx, "SetCompleted", err, "PutID", put.PutID)
|
log.ZError(ctx, "DelPut", err, "PutID", put.PutID)
|
||||||
}
|
}
|
||||||
return &third.ConfirmPutResp{
|
return &third.ConfirmPutResp{
|
||||||
Url: c.urlName(o.Name),
|
Url: c.urlName(o.Name),
|
||||||
|
@ -18,7 +18,6 @@ type ObjectPutModel struct {
|
|||||||
ObjectSize int64 `gorm:"column:object_size"`
|
ObjectSize int64 `gorm:"column:object_size"`
|
||||||
FragmentSize int64 `gorm:"column:fragment_size"`
|
FragmentSize int64 `gorm:"column:fragment_size"`
|
||||||
PutURLsHash string `gorm:"column:put_urls_hash"`
|
PutURLsHash string `gorm:"column:put_urls_hash"`
|
||||||
Complete bool `gorm:"column:complete"`
|
|
||||||
ValidTime *time.Time `gorm:"column:valid_time"`
|
ValidTime *time.Time `gorm:"column:valid_time"`
|
||||||
EffectiveTime time.Time `gorm:"column:effective_time"`
|
EffectiveTime time.Time `gorm:"column:effective_time"`
|
||||||
CreateTime time.Time `gorm:"column:create_time"`
|
CreateTime time.Time `gorm:"column:create_time"`
|
||||||
|
Loading…
x
Reference in New Issue
Block a user