mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-26 03:26:57 +08:00
group
This commit is contained in:
parent
16231ee077
commit
84ee1cdf9f
@ -170,6 +170,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
||||
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
|
||||
ctxMsg.message = &msgFromMQ
|
||||
log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
|
||||
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
|
||||
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
|
||||
oldM = append(oldM, ctxMsg)
|
||||
aggregationMsgs[string(consumerMessages[i].Key)] = oldM
|
||||
@ -211,7 +212,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
||||
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
|
||||
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
|
||||
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
|
||||
t := time.NewTicker(time.Duration(100) * time.Millisecond)
|
||||
t := time.NewTicker(time.Millisecond * 100)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
@ -1,235 +0,0 @@
|
||||
package objstorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewController(i Interface, kv KV) (*Controller, error) {
|
||||
if err := i.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Controller{
|
||||
i: i,
|
||||
kv: kv,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type Controller struct {
|
||||
i Interface
|
||||
//i *minioImpl
|
||||
kv KV
|
||||
}
|
||||
|
||||
func (c *Controller) key(v string) string {
|
||||
return "OBJECT_STORAGE:" + c.i.Name() + ":" + v
|
||||
}
|
||||
|
||||
func (c *Controller) putKey(v string) string {
|
||||
return c.key("put:" + v)
|
||||
}
|
||||
|
||||
func (c *Controller) pathKey(v string) string {
|
||||
return c.key("path:" + v)
|
||||
}
|
||||
|
||||
func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutAddr, error) {
|
||||
if data, err := c.kv.Get(ctx, c.pathKey(args.Hash)); err == nil {
|
||||
// 服务器已存在
|
||||
var src BucketFile
|
||||
if err := json.Unmarshal([]byte(data), &src); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var bucket string
|
||||
if args.ClearTime <= 0 {
|
||||
bucket = c.i.PermanentBucket()
|
||||
} else {
|
||||
bucket = c.i.ClearBucket()
|
||||
}
|
||||
dst := &BucketFile{
|
||||
Bucket: bucket,
|
||||
Name: args.Name,
|
||||
}
|
||||
// 直接拷贝一份
|
||||
err := c.i.CopyObjectInfo(ctx, &src, dst)
|
||||
if err == nil {
|
||||
info, err := c.i.GetObjectInfo(ctx, dst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &PutAddr{
|
||||
ResourceURL: info.URL,
|
||||
}, nil
|
||||
} else if !c.i.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
} else if !c.kv.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
// 上传逻辑
|
||||
name := args.Name
|
||||
effective := time.Now().Add(args.EffectiveTime)
|
||||
prefix := c.Prefix(&args.PutArgs)
|
||||
if minSize := c.i.MinMultipartSize(); args.FragmentSize > 0 && args.FragmentSize < minSize {
|
||||
args.FragmentSize = minSize
|
||||
}
|
||||
var pack int64
|
||||
if args.FragmentSize <= 0 || args.Size <= args.FragmentSize {
|
||||
pack = 1
|
||||
} else {
|
||||
pack = args.Size / args.FragmentSize
|
||||
if args.Size%args.FragmentSize > 0 {
|
||||
pack++
|
||||
}
|
||||
}
|
||||
p := path.Join(path.Dir(args.Name), time.Now().Format("20060102"))
|
||||
info := putInfo{
|
||||
Bucket: c.i.UploadBucket(),
|
||||
Fragments: make([]string, 0, pack),
|
||||
FragmentSize: args.FragmentSize,
|
||||
Name: name,
|
||||
Hash: args.Hash,
|
||||
Size: args.Size,
|
||||
}
|
||||
if args.ClearTime > 0 {
|
||||
t := time.Now().Add(args.ClearTime).UnixMilli()
|
||||
info.ClearTime = &t
|
||||
}
|
||||
putURLs := make([]string, 0, pack)
|
||||
for i := int64(1); i <= pack; i++ {
|
||||
name := prefix + "_" + strconv.FormatInt(i, 10) + path.Ext(args.Name)
|
||||
name = path.Join(p, name)
|
||||
info.Fragments = append(info.Fragments, name)
|
||||
args.Name = name
|
||||
put, err := c.i.ApplyPut(ctx, &ApplyPutArgs{
|
||||
Bucket: info.Bucket,
|
||||
Name: name,
|
||||
Effective: args.EffectiveTime,
|
||||
Header: args.Header,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
putURLs = append(putURLs, put.URL)
|
||||
}
|
||||
data, err := json.Marshal(&info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.kv.Set(ctx, c.putKey(prefix), string(data), args.EffectiveTime); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var fragmentSize int64
|
||||
if pack == 1 {
|
||||
fragmentSize = args.Size
|
||||
} else {
|
||||
fragmentSize = args.FragmentSize
|
||||
}
|
||||
return &PutAddr{
|
||||
PutURLs: putURLs,
|
||||
FragmentSize: fragmentSize,
|
||||
PutID: prefix,
|
||||
EffectiveTime: effective,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Controller) ConfirmPut(ctx context.Context, putID string) (*ObjectInfo, error) {
|
||||
data, err := c.kv.Get(ctx, c.putKey(putID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var info putInfo
|
||||
if err := json.Unmarshal([]byte(data), &info); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var total int64
|
||||
src := make([]BucketFile, len(info.Fragments))
|
||||
for i, fragment := range info.Fragments {
|
||||
state, err := c.i.GetObjectInfo(ctx, &BucketFile{
|
||||
Bucket: info.Bucket,
|
||||
Name: fragment,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
total += state.Size
|
||||
src[i] = BucketFile{
|
||||
Bucket: info.Bucket,
|
||||
Name: fragment,
|
||||
}
|
||||
}
|
||||
if total != info.Size {
|
||||
return nil, fmt.Errorf("incomplete upload %d/%d", total, info.Size)
|
||||
}
|
||||
var dst *BucketFile
|
||||
if info.ClearTime == nil {
|
||||
dst = &BucketFile{
|
||||
Bucket: c.i.PermanentBucket(),
|
||||
Name: info.Name,
|
||||
}
|
||||
} else {
|
||||
dst = &BucketFile{
|
||||
Bucket: c.i.ClearBucket(),
|
||||
Name: info.Name,
|
||||
}
|
||||
}
|
||||
if err := c.i.MergeObjectInfo(ctx, src, dst); err != nil { // SourceInfo 0 is too small (2) and it is not the last part
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.i.GetObjectInfo(ctx, dst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
err := c.kv.Del(ctx, c.putKey(putID))
|
||||
if err != nil {
|
||||
log.Println("del key:", err)
|
||||
}
|
||||
for _, b := range src {
|
||||
err = c.i.DeleteObjectInfo(ctx, &b)
|
||||
if err != nil {
|
||||
log.Println("del obj:", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (c *Controller) Prefix(args *PutArgs) string {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
buf.WriteString(args.Name)
|
||||
buf.WriteString("~~~@~@~~~")
|
||||
buf.WriteString(strconv.FormatInt(args.Size, 10))
|
||||
buf.WriteString(",")
|
||||
buf.WriteString(args.Hash)
|
||||
buf.WriteString(",")
|
||||
buf.WriteString(strconv.FormatInt(int64(args.ClearTime), 10))
|
||||
buf.WriteString(",")
|
||||
buf.WriteString(strconv.FormatInt(int64(args.EffectiveTime), 10))
|
||||
buf.WriteString(",")
|
||||
buf.WriteString(c.i.Name())
|
||||
r := make([]byte, 16)
|
||||
rand.Read(r)
|
||||
buf.Write(r)
|
||||
md5v := md5.Sum(buf.Bytes())
|
||||
return hex.EncodeToString(md5v[:])
|
||||
}
|
||||
|
||||
type putInfo struct {
|
||||
Bucket string
|
||||
Fragments []string
|
||||
FragmentSize int64
|
||||
Size int64
|
||||
Name string
|
||||
Hash string
|
||||
ClearTime *int64
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
package objstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type KV interface {
|
||||
Get(ctx context.Context, key string) (string, error)
|
||||
Set(ctx context.Context, key string, val string, expiration time.Duration) error
|
||||
Del(ctx context.Context, key string) error
|
||||
IsNotFound(err error) bool
|
||||
}
|
||||
|
||||
func NewKV() KV {
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "",
|
||||
Username: "",
|
||||
Password: "",
|
||||
})
|
||||
return &redisImpl{
|
||||
rdb: rdb,
|
||||
}
|
||||
}
|
||||
|
||||
type redisImpl struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func (r *redisImpl) Del(ctx context.Context, key string) error {
|
||||
log.Println("redis del", key)
|
||||
return r.rdb.Del(ctx, key).Err()
|
||||
}
|
||||
|
||||
func (r *redisImpl) Get(ctx context.Context, key string) (string, error) {
|
||||
log.Println("redis get", key)
|
||||
return r.rdb.Get(ctx, key).Result()
|
||||
}
|
||||
|
||||
func (r *redisImpl) Set(ctx context.Context, key string, val string, expiration time.Duration) error {
|
||||
log.Println("redis set", key, val, expiration.String())
|
||||
return r.rdb.Set(ctx, key, val, expiration).Err()
|
||||
}
|
||||
|
||||
func (r *redisImpl) IsNotFound(err error) bool {
|
||||
return err == redis.Nil
|
||||
}
|
@ -1,112 +0,0 @@
|
||||
package objstorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"path"
|
||||
"time"
|
||||
)
|
||||
|
||||
func HttpPut(url string, body io.Reader) error {
|
||||
req, err := http.NewRequest(http.MethodPut, url, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client := http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("http [%s] %s", resp.Status, data)
|
||||
}
|
||||
if len(data) > 0 {
|
||||
log.Println("[http body]", string(data))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Md5(p []byte) string {
|
||||
t := md5.Sum(p)
|
||||
return hex.EncodeToString(t[:])
|
||||
}
|
||||
|
||||
func Main() {
|
||||
ctx := context.Background()
|
||||
c, err := NewController(&minioImpl{}, NewKV())
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
name := "hello.txt"
|
||||
data := []byte("hello world")
|
||||
|
||||
userID := "10000"
|
||||
|
||||
name = path.Join("user_"+userID, name)
|
||||
|
||||
addr, err := c.ApplyPut(ctx, &FragmentPutArgs{
|
||||
PutArgs: PutArgs{
|
||||
Name: name,
|
||||
Size: int64(len(data)),
|
||||
Hash: Md5(data),
|
||||
EffectiveTime: time.Second * 60 * 60,
|
||||
},
|
||||
FragmentSize: 2,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
fmt.Println()
|
||||
fmt.Println()
|
||||
|
||||
if addr.ResourceURL != "" {
|
||||
log.Println("服务器已经存在")
|
||||
return
|
||||
}
|
||||
var (
|
||||
start int
|
||||
end = int(addr.FragmentSize)
|
||||
)
|
||||
|
||||
for _, u := range addr.PutURLs {
|
||||
if start >= len(data) {
|
||||
break
|
||||
}
|
||||
if end > len(data) {
|
||||
end = len(data)
|
||||
}
|
||||
_ = u
|
||||
page := data[start:end]
|
||||
fmt.Print(string(page))
|
||||
start += int(addr.FragmentSize)
|
||||
end += int(addr.FragmentSize)
|
||||
err = HttpPut(u, bytes.NewReader(page))
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
}
|
||||
fmt.Println()
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("[PUT_ID]", addr.PutID)
|
||||
|
||||
info, err := c.ConfirmPut(ctx, addr.PutID)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
log.Printf("%+v\n", info)
|
||||
|
||||
log.Println("success")
|
||||
}
|
@ -1,147 +0,0 @@
|
||||
package objstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/minio/minio-go"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewMinio() Interface {
|
||||
return &minioImpl{}
|
||||
}
|
||||
|
||||
type minioImpl struct {
|
||||
uploadBucket string // 上传桶
|
||||
permanentBucket string // 永久桶
|
||||
clearBucket string // 自动清理桶
|
||||
client *minio.Client
|
||||
}
|
||||
|
||||
func (m *minioImpl) Init() error {
|
||||
client, err := minio.New("127.0.0.1:9000", "minioadmin", "minioadmin", false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("minio client error: %w", err)
|
||||
}
|
||||
m.client = client
|
||||
m.uploadBucket = "upload"
|
||||
m.permanentBucket = "permanent"
|
||||
m.clearBucket = "clear"
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *minioImpl) Name() string {
|
||||
return "minio"
|
||||
}
|
||||
|
||||
func (m *minioImpl) MinMultipartSize() int64 {
|
||||
return 1024 * 1024 * 5 // minio.absMinPartSize
|
||||
}
|
||||
|
||||
func (m *minioImpl) UploadBucket() string {
|
||||
return m.uploadBucket
|
||||
}
|
||||
|
||||
func (m *minioImpl) PermanentBucket() string {
|
||||
return m.permanentBucket
|
||||
}
|
||||
|
||||
func (m *minioImpl) ClearBucket() string {
|
||||
return m.clearBucket
|
||||
}
|
||||
|
||||
func (m *minioImpl) urlReplace(u *url.URL) {
|
||||
|
||||
}
|
||||
|
||||
func (m *minioImpl) ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) {
|
||||
if args.Effective <= 0 {
|
||||
return nil, errors.New("EffectiveTime <= 0")
|
||||
}
|
||||
_, err := m.GetObjectInfo(ctx, &BucketFile{
|
||||
Bucket: m.uploadBucket,
|
||||
Name: args.Name,
|
||||
})
|
||||
if err == nil {
|
||||
return nil, fmt.Errorf("minio bucket %s name %s already exists", args.Bucket, args.Name)
|
||||
} else if !m.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
effective := time.Now().Add(args.Effective)
|
||||
u, err := m.client.PresignedPutObject(m.uploadBucket, args.Name, args.Effective)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("minio apply error: %w", err)
|
||||
}
|
||||
m.urlReplace(u)
|
||||
return &PutRes{
|
||||
URL: u.String(),
|
||||
Bucket: m.uploadBucket,
|
||||
Name: args.Name,
|
||||
EffectiveTime: effective,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) {
|
||||
info, err := m.client.StatObject(args.Bucket, args.Name, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ObjectInfo{
|
||||
URL: "", // todo
|
||||
Size: info.Size,
|
||||
Hash: info.ETag,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *minioImpl) CopyObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
|
||||
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return m.client.CopyObject(destination, minio.NewSourceInfo(src.Bucket, src.Name, nil))
|
||||
}
|
||||
|
||||
func (m *minioImpl) DeleteObjectInfo(ctx context.Context, info *BucketFile) error {
|
||||
return m.client.RemoveObject(info.Bucket, info.Name)
|
||||
}
|
||||
|
||||
func (m *minioImpl) MoveObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
|
||||
if err := m.CopyObjectInfo(ctx, src, dst); err != nil {
|
||||
return err
|
||||
}
|
||||
return m.DeleteObjectInfo(ctx, src)
|
||||
}
|
||||
|
||||
func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error {
|
||||
switch len(src) {
|
||||
case 0:
|
||||
return errors.New("src empty")
|
||||
case 1:
|
||||
return m.CopyObjectInfo(ctx, &src[0], dst)
|
||||
}
|
||||
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sources := make([]minio.SourceInfo, len(src))
|
||||
for i, s := range src {
|
||||
sources[i] = minio.NewSourceInfo(s.Bucket, s.Name, nil)
|
||||
}
|
||||
return m.client.ComposeObject(destination, sources) // todo
|
||||
}
|
||||
|
||||
func (m *minioImpl) IsNotFound(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
switch e := err.(type) {
|
||||
case minio.ErrorResponse:
|
||||
return e.StatusCode == 404 && e.Code == "NoSuchKey"
|
||||
case *minio.ErrorResponse:
|
||||
return e.StatusCode == 404 && e.Code == "NoSuchKey"
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
package objstorage
|
||||
|
||||
import "context"
|
||||
|
||||
type Interface interface {
|
||||
Init() error
|
||||
Name() string
|
||||
MinMultipartSize() int64
|
||||
UploadBucket() string
|
||||
PermanentBucket() string
|
||||
ClearBucket() string
|
||||
ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error)
|
||||
GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error)
|
||||
CopyObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
|
||||
DeleteObjectInfo(ctx context.Context, info *BucketFile) error
|
||||
MoveObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
|
||||
MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error
|
||||
IsNotFound(err error) bool
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
package objstorage
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PutRes struct {
|
||||
URL string
|
||||
Bucket string
|
||||
Name string
|
||||
EffectiveTime time.Time
|
||||
}
|
||||
|
||||
type FragmentPutArgs struct {
|
||||
PutArgs
|
||||
FragmentSize int64 // 分片大小
|
||||
}
|
||||
|
||||
type PutArgs struct {
|
||||
Name string // 文件名
|
||||
Size int64 // 大小
|
||||
Hash string // md5
|
||||
Prefix string // 前缀
|
||||
ClearTime time.Duration // 自动清理时间
|
||||
EffectiveTime time.Duration // 申请有效时间
|
||||
Header http.Header // header
|
||||
}
|
||||
|
||||
type BucketFile struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type ObjectInfo struct {
|
||||
URL string
|
||||
Size int64
|
||||
Hash string
|
||||
}
|
||||
|
||||
//type PutSpace struct {
|
||||
// URL string
|
||||
// EffectiveTime time.Time
|
||||
//}
|
||||
|
||||
type PutAddr struct {
|
||||
ResourceURL string
|
||||
PutID string
|
||||
FragmentSize int64
|
||||
EffectiveTime time.Time
|
||||
PutURLs []string
|
||||
}
|
||||
|
||||
type KVData struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type PutResp struct {
|
||||
URL string
|
||||
Time *time.Time
|
||||
}
|
||||
|
||||
type ApplyPutArgs struct {
|
||||
Bucket string
|
||||
Name string
|
||||
Effective time.Duration // 申请有效时间
|
||||
Header http.Header // header
|
||||
}
|
@ -127,7 +127,6 @@ message SetMessageReactionExtensionsResp {
|
||||
repeated KeyValueResp result = 4;
|
||||
}
|
||||
|
||||
|
||||
message GetMessagesReactionExtensionsReq {
|
||||
string sourceID = 1;
|
||||
int32 sessionType = 2;
|
||||
|
@ -3,6 +3,7 @@ package rpcclient
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
@ -13,12 +14,53 @@ import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
func newContentTypeConf() map[int32]config.NotificationConf {
|
||||
return map[int32]config.NotificationConf{
|
||||
// group
|
||||
constant.GroupCreatedNotification: config.Config.Notification.GroupCreated,
|
||||
constant.GroupInfoSetNotification: config.Config.Notification.GroupInfoSet,
|
||||
constant.JoinGroupApplicationNotification: config.Config.Notification.JoinGroupApplication,
|
||||
constant.MemberQuitNotification: config.Config.Notification.MemberQuit,
|
||||
constant.GroupApplicationAcceptedNotification: config.Config.Notification.GroupApplicationAccepted,
|
||||
constant.GroupApplicationRejectedNotification: config.Config.Notification.GroupApplicationRejected,
|
||||
constant.GroupOwnerTransferredNotification: config.Config.Notification.GroupOwnerTransferred,
|
||||
constant.MemberKickedNotification: config.Config.Notification.MemberKicked,
|
||||
constant.MemberInvitedNotification: config.Config.Notification.MemberInvited,
|
||||
constant.MemberEnterNotification: config.Config.Notification.MemberEnter,
|
||||
constant.GroupDismissedNotification: config.Config.Notification.GroupDismissed,
|
||||
constant.GroupMutedNotification: config.Config.Notification.GroupMuted,
|
||||
constant.GroupCancelMutedNotification: config.Config.Notification.GroupCancelMuted,
|
||||
constant.GroupMemberMutedNotification: config.Config.Notification.GroupMemberMuted,
|
||||
constant.GroupMemberCancelMutedNotification: config.Config.Notification.GroupMemberCancelMuted,
|
||||
constant.GroupMemberInfoSetNotification: config.Config.Notification.GroupMemberInfoSet,
|
||||
constant.GroupMemberSetToAdminNotification: config.Config.Notification.GroupMemberSetToAdmin,
|
||||
constant.GroupMemberSetToOrdinaryUserNotification: config.Config.Notification.GroupMemberSetToOrdinary,
|
||||
// user
|
||||
constant.UserInfoUpdatedNotification: config.Config.Notification.UserInfoUpdated,
|
||||
// friend
|
||||
constant.FriendApplicationNotification: config.Config.Notification.FriendApplication,
|
||||
constant.FriendApplicationApprovedNotification: config.Config.Notification.FriendApplicationApproved,
|
||||
constant.FriendApplicationRejectedNotification: config.Config.Notification.FriendApplicationRejected,
|
||||
constant.FriendAddedNotification: config.Config.Notification.FriendAdded,
|
||||
constant.FriendDeletedNotification: config.Config.Notification.FriendDeleted,
|
||||
constant.FriendRemarkSetNotification: config.Config.Notification.FriendRemarkSet,
|
||||
constant.BlackAddedNotification: config.Config.Notification.BlackAdded,
|
||||
constant.BlackDeletedNotification: config.Config.Notification.BlackDeleted,
|
||||
constant.FriendInfoUpdatedNotification: config.Config.Notification.FriendInfoUpdated,
|
||||
// conversation
|
||||
constant.ConversationChangeNotification: config.Config.Notification.ConversationChanged,
|
||||
constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged,
|
||||
constant.ConversationPrivateChatNotification: config.Config.Notification.ConversationSetPrivate,
|
||||
}
|
||||
}
|
||||
|
||||
type MsgClient struct {
|
||||
*MetaClient
|
||||
contentTypeConf map[int32]config.NotificationConf
|
||||
}
|
||||
|
||||
func NewMsgClient(zk discoveryregistry.SvcDiscoveryRegistry) *MsgClient {
|
||||
return &MsgClient{NewMetaClient(zk, config.Config.RpcRegisterName.OpenImMsgName)}
|
||||
return &MsgClient{MetaClient: NewMetaClient(zk, config.Config.RpcRegisterName.OpenImMsgName), contentTypeConf: newContentTypeConf()}
|
||||
}
|
||||
|
||||
func (m *MsgClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
|
||||
@ -48,9 +90,10 @@ func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMes
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, cfg config.NotificationConf, opts ...utils.OptionsOpt) error {
|
||||
func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...utils.OptionsOpt) error {
|
||||
content, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgClient Notification json.Marshal failed", err)
|
||||
return err
|
||||
}
|
||||
var req msg.SendMsgReq
|
||||
@ -68,7 +111,7 @@ func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, con
|
||||
// msg.Options = make(map[string]bool, 7)
|
||||
// todo notification get sender name and face url
|
||||
// msg.SenderNickname, msg.SenderFaceURL, err = c.getFaceURLAndName(sendID)
|
||||
options := config.GetOptionsByNotification(cfg)
|
||||
options := config.GetOptionsByNotification(c.contentTypeConf[contentType])
|
||||
options = utils.WithOptions(options, opts...)
|
||||
msg.Options = options
|
||||
offlineInfo.Title = title
|
||||
@ -77,5 +120,10 @@ func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, con
|
||||
msg.OfflinePushInfo = &offlineInfo
|
||||
req.MsgData = &msg
|
||||
_, err = c.SendMsg(ctx, &req)
|
||||
if err == nil {
|
||||
log.ZDebug(ctx, "MsgClient Notification SendMsg success", "req", &req)
|
||||
} else {
|
||||
log.ZError(ctx, "MsgClient Notification SendMsg failed %s\n", err, "req", &req)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -2,8 +2,8 @@ package notification
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
@ -218,19 +218,21 @@ func (g *GroupNotificationSender) getFromToUserNickname(ctx context.Context, fro
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) groupNotification(ctx context.Context, contentType int32, m proto.Message, sendID, groupID, recvUserID string) (err error) {
|
||||
content, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
notificationMsg := rpcclient.NotificationMsg{
|
||||
SendID: sendID,
|
||||
RecvID: recvUserID,
|
||||
ContentType: contentType,
|
||||
SessionType: constant.GroupChatType,
|
||||
MsgFrom: constant.SysMsgType,
|
||||
Content: content,
|
||||
}
|
||||
return g.msgClient.Notification(ctx, ¬ificationMsg)
|
||||
//content, err := json.Marshal(m)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//notificationMsg := rpcclient.NotificationMsg{
|
||||
// SendID: sendID,
|
||||
// RecvID: recvUserID,
|
||||
// ContentType: contentType,
|
||||
// SessionType: constant.GroupChatType,
|
||||
// MsgFrom: constant.SysMsgType,
|
||||
// Content: content,
|
||||
//}
|
||||
//return g.msgClient.Notification(ctx, ¬ificationMsg)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, group *relation.GroupModel, members []*relation.GroupMemberModel, userMap map[string]*sdkws.UserInfo) (err error) {
|
||||
@ -317,12 +319,6 @@ func (g *GroupNotificationSender) mergeGroupFull(ctx context.Context, groupID st
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, group *relation.GroupModel, members []*relation.GroupMemberModel, needVerification *int32) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
groupInfo, err := g.mergeGroupFull(ctx, group.GroupID, group, &members, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -331,9 +327,11 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context,
|
||||
if needVerification != nil {
|
||||
groupInfoChangedTips.Group.NeedVerification = *needVerification
|
||||
}
|
||||
return g.groupNotification(ctx, constant.GroupInfoSetNotification, groupInfoChangedTips, mcontext.GetOpUserID(ctx), group.GroupID, "")
|
||||
return g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupInfoSetNotification, constant.SuperGroupChatType, groupInfoChangedTips)
|
||||
}
|
||||
|
||||
// ####################################
|
||||
|
||||
func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbGroup.JoinGroupReq) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
@ -356,6 +354,7 @@ func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.C
|
||||
joinGroupApplicationTips := &sdkws.JoinGroupApplicationTips{Group: group, Applicant: user, ReqMsg: req.ReqMessage}
|
||||
for _, userID := range userIDs {
|
||||
err := g.groupNotification(ctx, constant.JoinGroupApplicationNotification, joinGroupApplicationTips, mcontext.GetOpUserID(ctx), "", userID)
|
||||
err = g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.JoinGroupApplicationNotification, constant.SuperGroupChatType, joinGroupApplicationTips, config.Config.Notification.JoinGroupApplication)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "JoinGroupApplicationNotification failed", err, "group", req.GroupID, "userID", userID)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user