mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-26 21:22:16 +08:00
update gRPC Implement.
This commit is contained in:
parent
d3aca9d88f
commit
154a19eed1
@ -56,6 +56,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
|
msggateway.UnimplementedMsgGatewayServer
|
||||||
rpcPort int
|
rpcPort int
|
||||||
LongConnServer LongConnServer
|
LongConnServer LongConnServer
|
||||||
config *Config
|
config *Config
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type pushServer struct {
|
type pushServer struct {
|
||||||
|
pbpush.UnimplementedPushMsgServiceServer
|
||||||
database controller.PushDatabase
|
database controller.PushDatabase
|
||||||
disCov discovery.SvcDiscoveryRegistry
|
disCov discovery.SvcDiscoveryRegistry
|
||||||
offlinePusher offlinepush.OfflinePusher
|
offlinePusher offlinepush.OfflinePusher
|
||||||
|
|||||||
@ -40,6 +40,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type authServer struct {
|
type authServer struct {
|
||||||
|
pbauth.UnimplementedAuthServer
|
||||||
authDatabase controller.AuthDatabase
|
authDatabase controller.AuthDatabase
|
||||||
userRpcClient *rpcclient.UserRpcClient
|
userRpcClient *rpcclient.UserRpcClient
|
||||||
RegisterCenter discovery.SvcDiscoveryRegistry
|
RegisterCenter discovery.SvcDiscoveryRegistry
|
||||||
|
|||||||
@ -43,6 +43,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type conversationServer struct {
|
type conversationServer struct {
|
||||||
|
pbconversation.UnimplementedConversationServer
|
||||||
msgRpcClient *rpcclient.MessageRpcClient
|
msgRpcClient *rpcclient.MessageRpcClient
|
||||||
user *rpcclient.UserRpcClient
|
user *rpcclient.UserRpcClient
|
||||||
groupRpcClient *rpcclient.GroupRpcClient
|
groupRpcClient *rpcclient.GroupRpcClient
|
||||||
|
|||||||
@ -57,6 +57,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type groupServer struct {
|
type groupServer struct {
|
||||||
|
pbgroup.UnimplementedGroupServer
|
||||||
db controller.GroupDatabase
|
db controller.GroupDatabase
|
||||||
user rpcclient.UserRpcClient
|
user rpcclient.UserRpcClient
|
||||||
notification *GroupNotificationSender
|
notification *GroupNotificationSender
|
||||||
|
|||||||
@ -55,6 +55,7 @@ type (
|
|||||||
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
|
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
|
||||||
config *Config // Global configuration settings.
|
config *Config // Global configuration settings.
|
||||||
webhookClient *webhook.Client
|
webhookClient *webhook.Client
|
||||||
|
msg.UnimplementedMsgServer
|
||||||
}
|
}
|
||||||
|
|
||||||
Config struct {
|
Config struct {
|
||||||
|
|||||||
@ -43,6 +43,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type friendServer struct {
|
type friendServer struct {
|
||||||
|
relation.UnimplementedFriendServer
|
||||||
db controller.FriendDatabase
|
db controller.FriendDatabase
|
||||||
blackDatabase controller.BlackDatabase
|
blackDatabase controller.BlackDatabase
|
||||||
userRpcClient *rpcclient.UserRpcClient
|
userRpcClient *rpcclient.UserRpcClient
|
||||||
|
|||||||
@ -290,74 +290,87 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
|
|||||||
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
|
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
|
||||||
var conf config.Third
|
var conf config.Third
|
||||||
expireTime := time.UnixMilli(req.ExpireTime)
|
expireTime := time.UnixMilli(req.ExpireTime)
|
||||||
var deltotal int
|
|
||||||
excuteNum := 5
|
|
||||||
|
|
||||||
findPagination := &sdkws.RequestPagination{
|
findPagination := &sdkws.RequestPagination{
|
||||||
PageNumber: 1,
|
PageNumber: 1,
|
||||||
ShowNumber: 1000,
|
ShowNumber: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < excuteNum; i++ {
|
log.ZDebug(ctx, "del type is ", "needDelType", req.ObjectGroup)
|
||||||
// Find all expired data in S3 database
|
|
||||||
total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination)
|
// Find all expired data in S3 database
|
||||||
|
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
|
||||||
|
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if total == 0 {
|
||||||
|
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
|
||||||
|
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
needDelObjectKeys := make([]string, len(models))
|
||||||
|
for _, model := range models {
|
||||||
|
needDelObjectKeys = append(needDelObjectKeys, model.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove duplicate keys, have the same key use in different models
|
||||||
|
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
|
||||||
|
|
||||||
|
for _, key := range needDelObjectKeys {
|
||||||
|
// Find all models by key
|
||||||
|
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
|
||||||
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
|
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
|
||||||
return nil, errs.Wrap(err)
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
needDelObjectKeys := make([]string, len(models))
|
|
||||||
for _, model := range models {
|
// check keyModels, if all keyModels.
|
||||||
needDelObjectKeys = append(needDelObjectKeys, model.Key)
|
needDelKey := true // Default can delete
|
||||||
|
for _, keymodel := range keyModels {
|
||||||
|
// If group is empty or CreateTime is after expireTime, can't delete this key
|
||||||
|
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
|
||||||
|
needDelKey = false
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove duplicate keys, have the same key use in different models
|
// If this object is not referenced by not expire data, delete it
|
||||||
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
|
if needDelKey && t.minio != nil {
|
||||||
|
// If have a thumbnail, delete it
|
||||||
for _, key := range needDelObjectKeys {
|
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
|
||||||
// Find all models by key
|
if thumbnailKey != "" {
|
||||||
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
|
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
|
||||||
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
|
||||||
}
|
|
||||||
|
|
||||||
// check keyModels, if all keyModels.
|
|
||||||
needDelKey := true // Default can delete
|
|
||||||
for _, model := range keyModels {
|
|
||||||
// If group is empty or CreateTime is after expireTime, can't delete this key
|
|
||||||
if model.Group == "" || model.CreateTime.After(expireTime) {
|
|
||||||
needDelKey = false
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this object is not referenced by not expire data, delete it
|
// Delete object
|
||||||
if needDelKey && t.minio != nil {
|
err = t.s3dataBase.DeleteObject(ctx, key)
|
||||||
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
|
|
||||||
|
|
||||||
t.s3dataBase.DeleteObject(ctx, thumbnailKey)
|
|
||||||
t.s3dataBase.DeleteObject(ctx, key)
|
|
||||||
|
|
||||||
t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, model := range models {
|
|
||||||
// Delete all expired data row in S3 database
|
|
||||||
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete cache key
|
||||||
|
err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
|
||||||
|
if err != nil {
|
||||||
|
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if total < int64(findPagination.ShowNumber) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
deltotal += int(total)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal)
|
// handle delete data in S3 database
|
||||||
|
for _, model := range models {
|
||||||
|
// Delete all expired data row in S3 database
|
||||||
|
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &third.DeleteOutdatedDataResp{}, nil
|
log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)
|
||||||
|
|
||||||
|
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type FormDataMate struct {
|
type FormDataMate struct {
|
||||||
|
|||||||
@ -38,6 +38,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type thirdServer struct {
|
type thirdServer struct {
|
||||||
|
third.UnimplementedThirdServer
|
||||||
thirdDatabase controller.ThirdDatabase
|
thirdDatabase controller.ThirdDatabase
|
||||||
s3dataBase controller.S3Database
|
s3dataBase controller.S3Database
|
||||||
userRpcClient rpcclient.UserRpcClient
|
userRpcClient rpcclient.UserRpcClient
|
||||||
|
|||||||
@ -52,6 +52,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type userServer struct {
|
type userServer struct {
|
||||||
|
pbuser.UnimplementedUserServer
|
||||||
online cache.OnlineCache
|
online cache.OnlineCache
|
||||||
db controller.UserDatabase
|
db controller.UserDatabase
|
||||||
friendNotificationSender *relation.FriendNotificationSender
|
friendNotificationSender *relation.FriendNotificationSender
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user