mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-19 04:37:01 +08:00
Merge remote-tracking branch 'upstream/main'
This commit is contained in:
commit
84d7591738
@ -21,7 +21,7 @@ import (
|
||||
|
||||
func main() {
|
||||
cronTaskCmd := cmd.NewCronTaskCmd()
|
||||
if err := cronTaskCmd.Exec(tools.StartCronTask); err != nil {
|
||||
if err := cronTaskCmd.Exec(tools.StartTask); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,15 +124,19 @@ api:
|
||||
# Session token
|
||||
# Configuration for Tencent COS
|
||||
# Configuration for Aliyun OSS
|
||||
# apiURL is the address of the api, the access address of the app, use s3 must be configured
|
||||
# minio.endpoint can be configured as an intranet address,
|
||||
# minio.signEndpoint is minio public network address
|
||||
object:
|
||||
enable: "minio"
|
||||
apiURL: http://127.0.0.1:10002/object/
|
||||
apiURL: "http://127.0.0.1:10002"
|
||||
minio:
|
||||
bucket: "openim"
|
||||
endpoint: http://127.0.0.1:10005
|
||||
accessKeyID: root
|
||||
secretAccessKey: openIM123
|
||||
endpoint: "http://127.0.0.1:10005"
|
||||
accessKeyID: "root"
|
||||
secretAccessKey: "openIM123"
|
||||
sessionToken: ""
|
||||
signEndpoint: "http://127.0.0.1:10005"
|
||||
cos:
|
||||
bucketURL: "https://temp-1252357374.cos.ap-chengdu.myqcloud.com"
|
||||
secretID: ""
|
||||
@ -142,7 +146,7 @@ object:
|
||||
endpoint: "https://oss-cn-chengdu.aliyuncs.com"
|
||||
bucket: "demo-9999999"
|
||||
bucketURL: "https://demo-9999999.oss-cn-chengdu.aliyuncs.com"
|
||||
accessKeyID: root
|
||||
accessKeyID: ""
|
||||
accessKeySecret: ""
|
||||
sessionToken: ""
|
||||
|
||||
|
||||
4
go.mod
4
go.mod
@ -25,7 +25,7 @@ require (
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
github.com/stretchr/testify v1.8.4
|
||||
go.mongodb.org/mongo-driver v1.12.1
|
||||
golang.org/x/image v0.9.0 // indirect
|
||||
golang.org/x/image v0.11.0
|
||||
google.golang.org/api v0.135.0
|
||||
google.golang.org/grpc v1.57.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
@ -121,7 +121,7 @@ require (
|
||||
golang.org/x/oauth2 v0.10.0 // indirect
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/sys v0.10.0 // indirect
|
||||
golang.org/x/text v0.11.0 // indirect
|
||||
golang.org/x/text v0.12.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
|
||||
8
go.sum
8
go.sum
@ -364,8 +364,8 @@ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0
|
||||
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
|
||||
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/image v0.9.0 h1:QrzfX26snvCM20hIhBwuHI/ThTg18b/+kcKdXHvnR+g=
|
||||
golang.org/x/image v0.9.0/go.mod h1:jtrku+n79PfroUbvDdeUWMAI+heR786BofxrbiSF+J0=
|
||||
golang.org/x/image v0.11.0 h1:ds2RoQvBvYTiJkwpSFDwCcDFNX7DqjL2WsUgTNk0Ooo=
|
||||
golang.org/x/image v0.11.0/go.mod h1:bglhjqbqVuEb9e9+eNR45Jfu7D+T4Qan+NhQk8Ck2P8=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
@ -442,8 +442,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
|
||||
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
|
||||
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
|
||||
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@ -83,7 +83,14 @@ func (o *ThirdApi) ObjectRedirect(c *gin.Context) {
|
||||
operationID = strconv.Itoa(rand.Int())
|
||||
}
|
||||
ctx := mcontext.SetOperationID(c, operationID)
|
||||
resp, err := o.Client.AccessURL(ctx, &third.AccessURLReq{Name: name})
|
||||
query := make(map[string]string)
|
||||
for key, values := range c.Request.URL.Query() {
|
||||
if len(values) == 0 {
|
||||
continue
|
||||
}
|
||||
query[key] = values[0]
|
||||
}
|
||||
resp, err := o.Client.AccessURL(ctx, &third.AccessURLReq{Name: name, Query: query})
|
||||
if err != nil {
|
||||
if errs.ErrArgs.Is(err) {
|
||||
c.String(http.StatusBadRequest, err.Error())
|
||||
|
||||
@ -145,7 +145,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
len(modifyMsgList),
|
||||
)
|
||||
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
||||
conversationIDNotification := msgprocessor.GetNotificationConversationID(ctxMsgList[0].message)
|
||||
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
||||
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
||||
och.handleNotification(
|
||||
ctx,
|
||||
|
||||
@ -161,6 +161,9 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
||||
if req.OwnerUserID == "" {
|
||||
return nil, errs.ErrArgs.Wrap("no group owner")
|
||||
}
|
||||
if req.GroupInfo.GroupType != constant.WorkingGroup {
|
||||
return nil, errs.ErrArgs.Wrap(fmt.Sprintf("group type %d not support", req.GroupInfo.GroupType))
|
||||
}
|
||||
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -690,8 +693,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
|
||||
return nil, errs.ErrGroupRequestHandled.Wrap("group request already processed")
|
||||
}
|
||||
var inGroup bool
|
||||
_, err = s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.FromUserID)
|
||||
if err == nil {
|
||||
if _, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.FromUserID); err == nil {
|
||||
inGroup = true // 已经在群里了
|
||||
} else if !s.IsNotFound(err) {
|
||||
return nil, err
|
||||
@ -718,6 +720,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
log.ZDebug(ctx, "GroupApplicationResponse", "inGroup", inGroup, "HandleResult", req.HandleResult, "member", member)
|
||||
if err := s.GroupDatabase.HandlerGroupRequest(ctx, req.GroupID, req.FromUserID, req.HandledMsg, req.HandleResult, member); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -727,12 +730,14 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
|
||||
return nil, err
|
||||
}
|
||||
s.Notification.GroupApplicationAcceptedNotification(ctx, req)
|
||||
if member == nil {
|
||||
log.ZDebug(ctx, "GroupApplicationResponse", "member is nil")
|
||||
} else {
|
||||
s.Notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID)
|
||||
}
|
||||
case constant.GroupResponseRefuse:
|
||||
s.Notification.GroupApplicationRejectedNotification(ctx, req)
|
||||
}
|
||||
if member != nil {
|
||||
s.Notification.MemberEnterNotification(ctx, req)
|
||||
}
|
||||
return &pbGroup.GroupApplicationResponseResp{}, nil
|
||||
}
|
||||
|
||||
@ -778,7 +783,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
|
||||
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.Notification.MemberEnterDirectlyNotification(ctx, req.GroupID, req.InviterUserID)
|
||||
s.Notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID)
|
||||
return resp, nil
|
||||
}
|
||||
groupRequest := relationTb.GroupRequestModel{
|
||||
|
||||
@ -116,41 +116,75 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
|
||||
if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
sendIDs []string
|
||||
recvIDs []string
|
||||
groupIDs []string
|
||||
sendMap = make(map[string]string)
|
||||
recvMap = make(map[string]string)
|
||||
groupMap = make(map[string]*sdkws.GroupInfo)
|
||||
)
|
||||
for _, chatLog := range chatLogs {
|
||||
if chatLog.SenderNickname == "" {
|
||||
sendIDs = append(sendIDs, chatLog.SendID)
|
||||
}
|
||||
switch chatLog.SessionType {
|
||||
case constant.SingleChatType:
|
||||
recvIDs = append(recvIDs, chatLog.RecvID)
|
||||
case constant.GroupChatType, constant.SuperGroupChatType:
|
||||
groupIDs = append(groupIDs, chatLog.GroupID)
|
||||
}
|
||||
}
|
||||
if len(sendIDs) != 0 {
|
||||
sendInfos, err := m.User.GetUsersInfo(ctx, sendIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, sendInfo := range sendInfos {
|
||||
sendMap[sendInfo.UserID] = sendInfo.Nickname
|
||||
}
|
||||
}
|
||||
if len(recvIDs) != 0 {
|
||||
recvInfos, err := m.User.GetUsersInfo(ctx, recvIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, recvInfo := range recvInfos {
|
||||
recvMap[recvInfo.UserID] = recvInfo.Nickname
|
||||
}
|
||||
}
|
||||
if len(groupIDs) != 0 {
|
||||
groupInfos, err := m.Group.GetGroupInfos(ctx, groupIDs, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, groupInfo := range groupInfos {
|
||||
groupMap[groupInfo.GroupID] = groupInfo
|
||||
}
|
||||
}
|
||||
for _, chatLog := range chatLogs {
|
||||
pbChatLog := &msg.ChatLog{}
|
||||
utils.CopyStructFields(pbChatLog, chatLog)
|
||||
pbChatLog.SendTime = chatLog.SendTime
|
||||
pbChatLog.CreateTime = chatLog.CreateTime
|
||||
if chatLog.SenderNickname == "" {
|
||||
sendUser, err := m.User.GetUserInfo(ctx, chatLog.SendID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pbChatLog.SenderNickname = sendUser.Nickname
|
||||
pbChatLog.SenderNickname = sendMap[chatLog.SendID]
|
||||
}
|
||||
switch chatLog.SessionType {
|
||||
case constant.SingleChatType:
|
||||
recvUser, err := m.User.GetUserInfo(ctx, chatLog.RecvID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pbChatLog.RecvNickname = recvUser.Nickname
|
||||
pbChatLog.RecvNickname = recvMap[chatLog.RecvID]
|
||||
|
||||
case constant.GroupChatType, constant.SuperGroupChatType:
|
||||
group, err := m.Group.GetGroupInfo(ctx, chatLog.GroupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pbChatLog.SenderFaceURL = group.FaceURL
|
||||
pbChatLog.GroupMemberCount = group.MemberCount
|
||||
pbChatLog.RecvID = group.GroupID
|
||||
pbChatLog.GroupName = group.GroupName
|
||||
pbChatLog.GroupOwner = group.OwnerUserID
|
||||
pbChatLog.GroupType = group.GroupType
|
||||
pbChatLog.SenderFaceURL = groupMap[chatLog.GroupID].FaceURL
|
||||
pbChatLog.GroupMemberCount = groupMap[chatLog.GroupID].MemberCount
|
||||
pbChatLog.RecvID = groupMap[chatLog.GroupID].GroupID
|
||||
pbChatLog.GroupName = groupMap[chatLog.GroupID].GroupName
|
||||
pbChatLog.GroupOwner = groupMap[chatLog.GroupID].OwnerUserID
|
||||
pbChatLog.GroupType = groupMap[chatLog.GroupID].GroupType
|
||||
}
|
||||
resp.ChatLogs = append(resp.ChatLogs, pbChatLog)
|
||||
}
|
||||
|
||||
resp.ChatLogsNum = total
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -16,8 +16,11 @@ package third
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/third"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
@ -152,7 +155,21 @@ func (t *thirdServer) CompleteMultipartUpload(ctx context.Context, req *third.Co
|
||||
}
|
||||
|
||||
func (t *thirdServer) AccessURL(ctx context.Context, req *third.AccessURLReq) (*third.AccessURLResp, error) {
|
||||
expireTime, rawURL, err := t.s3dataBase.AccessURL(ctx, req.Name, t.defaultExpire)
|
||||
opt := &s3.AccessURLOption{}
|
||||
if len(req.Query) > 0 {
|
||||
switch req.Query["type"] {
|
||||
case "":
|
||||
case "image":
|
||||
opt.Image = &s3.Image{}
|
||||
opt.Image.Format = req.Query["format"]
|
||||
opt.Image.Width, _ = strconv.Atoi(req.Query["width"])
|
||||
opt.Image.Height, _ = strconv.Atoi(req.Query["height"])
|
||||
log.ZDebug(ctx, "AccessURL image", "name", req.Name, "option", opt.Image)
|
||||
default:
|
||||
return nil, errs.ErrArgs.Wrap("invalid query type")
|
||||
}
|
||||
}
|
||||
expireTime, rawURL, err := t.s3dataBase.AccessURL(ctx, req.Name, t.defaultExpire, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -49,6 +49,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
if apiURL[len(apiURL)-1] != '/' {
|
||||
apiURL += "/"
|
||||
}
|
||||
apiURL += "object/"
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -36,10 +36,11 @@ import (
|
||||
tablerelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
|
||||
registry "github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pbuser "github.com/OpenIMSDK/protocol/user"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type userServer struct {
|
||||
|
||||
@ -26,12 +26,13 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
)
|
||||
|
||||
func StartCronTask() error {
|
||||
func StartTask() error {
|
||||
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
|
||||
msgTool, err := InitMsgTool()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgTool.ConvertTools()
|
||||
c := cron.New()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
32
internal/tools/msg_doc_convert.go
Normal file
32
internal/tools/msg_doc_convert.go
Normal file
@ -0,0 +1,32 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
)
|
||||
|
||||
func (c *MsgTool) ConvertTools() {
|
||||
ctx := mcontext.NewCtx("convert")
|
||||
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "get all conversation ids failed", err)
|
||||
return
|
||||
}
|
||||
for _, conversationID := range conversationIDs {
|
||||
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationIDByConversationID(conversationID))
|
||||
}
|
||||
userIDs, err := c.userDatabase.GetAllUserID(ctx, 0, 0)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "get all user ids failed", err)
|
||||
return
|
||||
}
|
||||
log.ZDebug(ctx, "all userIDs", "len userIDs", len(userIDs))
|
||||
for _, userID := range userIDs {
|
||||
conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID))
|
||||
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID))
|
||||
}
|
||||
log.ZDebug(ctx, "all conversationIDs", "len userIDs", len(conversationIDs))
|
||||
c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
||||
}
|
||||
@ -120,6 +120,7 @@ type configStruct struct {
|
||||
AccessKeyID string `yaml:"accessKeyID"`
|
||||
SecretAccessKey string `yaml:"secretAccessKey"`
|
||||
SessionToken string `yaml:"sessionToken"`
|
||||
SignEndpoint string `yaml:"signEndpoint"`
|
||||
} `yaml:"minio"`
|
||||
Cos struct {
|
||||
BucketURL string `yaml:"bucketURL"`
|
||||
|
||||
@ -386,8 +386,24 @@ func (g *groupDatabase) HandlerGroupRequest(
|
||||
handleResult int32,
|
||||
member *relationTb.GroupMemberModel,
|
||||
) error {
|
||||
cache := g.cache.NewCache()
|
||||
if err := g.tx.Transaction(func(tx any) error {
|
||||
//cache := g.cache.NewCache()
|
||||
//if err := g.tx.Transaction(func(tx any) error {
|
||||
// if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if member != nil {
|
||||
// if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID)
|
||||
// }
|
||||
// return nil
|
||||
//}); err != nil {
|
||||
// return err
|
||||
//}
|
||||
//return cache.ExecDel(ctx)
|
||||
|
||||
return g.tx.Transaction(func(tx any) error {
|
||||
if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -395,13 +411,12 @@ func (g *groupDatabase) HandlerGroupRequest(
|
||||
if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil {
|
||||
return err
|
||||
}
|
||||
cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err := g.cache.NewCache().DelGroupMembersHash(groupID).DelGroupMembersInfo(groupID, member.UserID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return cache.ExecDel(ctx)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error {
|
||||
|
||||
@ -118,6 +118,7 @@ type CommonMsgDatabase interface {
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
}
|
||||
|
||||
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
||||
@ -961,7 +962,14 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.Searc
|
||||
return 0, nil, err
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
if msg.IsRead {
|
||||
msg.Msg.IsRead = true
|
||||
}
|
||||
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
|
||||
}
|
||||
return total, totalMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
||||
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
||||
}
|
||||
|
||||
@ -30,7 +30,7 @@ type S3Database interface {
|
||||
AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error)
|
||||
InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error)
|
||||
CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error)
|
||||
AccessURL(ctx context.Context, name string, expire time.Duration) (time.Time, string, error)
|
||||
AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error)
|
||||
SetObject(ctx context.Context, info *relation.ObjectModel) error
|
||||
}
|
||||
|
||||
@ -70,14 +70,19 @@ func (s *s3Database) SetObject(ctx context.Context, info *relation.ObjectModel)
|
||||
return s.obj.SetObject(ctx, info)
|
||||
}
|
||||
|
||||
func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Duration) (time.Time, string, error) {
|
||||
func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error) {
|
||||
obj, err := s.obj.Take(ctx, name)
|
||||
if err != nil {
|
||||
return time.Time{}, "", err
|
||||
}
|
||||
opt := &s3.AccessURLOption{
|
||||
ContentType: obj.ContentType,
|
||||
Filename: filepath.Base(obj.Name),
|
||||
if opt == nil {
|
||||
opt = &s3.AccessURLOption{}
|
||||
}
|
||||
if opt.ContentType == "" {
|
||||
opt.ContentType = obj.ContentType
|
||||
}
|
||||
if opt.Filename == "" {
|
||||
opt.Filename = filepath.Base(obj.Name)
|
||||
}
|
||||
expireTime := time.Now().Add(expire)
|
||||
rawURL, err := s.s3.AccessURL(ctx, obj.Key, expire, opt)
|
||||
|
||||
@ -18,9 +18,10 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/protocol/user"
|
||||
|
||||
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/tx"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
@ -86,7 +86,11 @@ func (u *UserGorm) Page(
|
||||
|
||||
// 获取所有用户ID.
|
||||
func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
|
||||
if pageNumber == 0 || showNumber == 0 {
|
||||
return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error)
|
||||
} else {
|
||||
return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
|
||||
|
||||
@ -257,5 +257,9 @@ func (c *Controller) IsNotFound(err error) bool {
|
||||
}
|
||||
|
||||
func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
|
||||
if opt.Image != nil {
|
||||
opt.Filename = ""
|
||||
opt.ContentType = ""
|
||||
}
|
||||
return c.impl.AccessURL(ctx, name, expire, opt)
|
||||
}
|
||||
|
||||
@ -36,6 +36,19 @@ const (
|
||||
maxNumSize = 1000
|
||||
)
|
||||
|
||||
const (
|
||||
imagePng = "png"
|
||||
imageJpg = "jpg"
|
||||
imageJpeg = "jpeg"
|
||||
imageGif = "gif"
|
||||
imageWebp = "webp"
|
||||
)
|
||||
|
||||
const (
|
||||
videoSnapshotImagePng = "png"
|
||||
videoSnapshotImageJpg = "jpg"
|
||||
)
|
||||
|
||||
func NewCos() (s3.Interface, error) {
|
||||
conf := config.Config.Object.Cos
|
||||
u, err := url.Parse(conf.BucketURL)
|
||||
@ -248,19 +261,44 @@ func (c *Cos) ListUploadedParts(ctx context.Context, uploadID string, name strin
|
||||
}
|
||||
|
||||
func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
|
||||
var option *cos.PresignedURLOptions
|
||||
var imageMogr string
|
||||
var option cos.PresignedURLOptions
|
||||
if opt != nil {
|
||||
query := make(url.Values)
|
||||
if opt.Image != nil {
|
||||
// https://cloud.tencent.com/document/product/436/44880
|
||||
style := make([]string, 0, 2)
|
||||
wh := make([]string, 2)
|
||||
if opt.Image.Width > 0 {
|
||||
wh[0] = strconv.Itoa(opt.Image.Width)
|
||||
}
|
||||
if opt.Image.Height > 0 {
|
||||
wh[1] = strconv.Itoa(opt.Image.Height)
|
||||
}
|
||||
if opt.Image.Width > 0 || opt.Image.Height > 0 {
|
||||
style = append(style, strings.Join(wh, "x"))
|
||||
}
|
||||
switch opt.Image.Format {
|
||||
case
|
||||
imagePng,
|
||||
imageJpg,
|
||||
imageJpeg,
|
||||
imageGif,
|
||||
imageWebp:
|
||||
style = append(style, "format/"+opt.Image.Format)
|
||||
}
|
||||
if len(style) > 0 {
|
||||
imageMogr = "&imageMogr2/thumbnail/" + strings.Join(style, "/") + "/ignore-error/1"
|
||||
}
|
||||
}
|
||||
if opt.ContentType != "" {
|
||||
query.Set("response-content-type", opt.ContentType)
|
||||
}
|
||||
if opt.Filename != "" {
|
||||
query.Set("response-content-disposition", `attachment; filename="`+opt.Filename+`"`)
|
||||
query.Set("response-content-disposition", `attachment; filename=`+strconv.Quote(opt.Filename))
|
||||
}
|
||||
if len(query) > 0 {
|
||||
option = &cos.PresignedURLOptions{
|
||||
Query: &query,
|
||||
}
|
||||
option.Query = &query
|
||||
}
|
||||
}
|
||||
if expire <= 0 {
|
||||
@ -268,9 +306,13 @@ func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration,
|
||||
} else if expire < time.Second {
|
||||
expire = time.Second
|
||||
}
|
||||
rawURL, err := c.client.Object.GetPresignedURL(ctx, http.MethodGet, name, c.credential.SecretID, c.credential.SecretKey, expire, option)
|
||||
rawURL, err := c.client.Object.GetPresignedURL(ctx, http.MethodGet, name, c.credential.SecretID, c.credential.SecretKey, expire, &option)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return rawURL.String(), nil
|
||||
urlStr := rawURL.String()
|
||||
if imageMogr != "" {
|
||||
urlStr += imageMogr
|
||||
}
|
||||
return urlStr, nil
|
||||
}
|
||||
|
||||
106
pkg/common/db/s3/minio/image.go
Normal file
106
pkg/common/db/s3/minio/image.go
Normal file
@ -0,0 +1,106 @@
|
||||
package minio
|
||||
|
||||
import (
|
||||
"image"
|
||||
_ "image/gif"
|
||||
_ "image/jpeg"
|
||||
_ "image/png"
|
||||
"io"
|
||||
|
||||
_ "golang.org/x/image/bmp"
|
||||
_ "golang.org/x/image/tiff"
|
||||
_ "golang.org/x/image/webp"
|
||||
)
|
||||
|
||||
const (
|
||||
formatPng = "png"
|
||||
formatJpeg = "jpeg"
|
||||
formatJpg = "jpg"
|
||||
formatGif = "gif"
|
||||
)
|
||||
|
||||
func ImageStat(reader io.Reader) (image.Image, string, error) {
|
||||
return image.Decode(reader)
|
||||
}
|
||||
|
||||
func ImageWidthHeight(img image.Image) (int, int) {
|
||||
bounds := img.Bounds().Max
|
||||
return bounds.X, bounds.Y
|
||||
}
|
||||
|
||||
func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
|
||||
bounds := img.Bounds()
|
||||
imgWidth := bounds.Max.X
|
||||
imgHeight := bounds.Max.Y
|
||||
|
||||
// 计算缩放比例
|
||||
scaleWidth := float64(maxWidth) / float64(imgWidth)
|
||||
scaleHeight := float64(maxHeight) / float64(imgHeight)
|
||||
|
||||
// 如果都为0,则不缩放,返回原始图片
|
||||
if maxWidth == 0 && maxHeight == 0 {
|
||||
return img
|
||||
}
|
||||
|
||||
// 如果宽度和高度都大于0,则选择较小的缩放比例,以保持宽高比
|
||||
if maxWidth > 0 && maxHeight > 0 {
|
||||
scale := scaleWidth
|
||||
if scaleHeight < scaleWidth {
|
||||
scale = scaleHeight
|
||||
}
|
||||
|
||||
// 计算缩略图尺寸
|
||||
thumbnailWidth := int(float64(imgWidth) * scale)
|
||||
thumbnailHeight := int(float64(imgHeight) * scale)
|
||||
|
||||
// 使用"image"库的Resample方法生成缩略图
|
||||
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
||||
for y := 0; y < thumbnailHeight; y++ {
|
||||
for x := 0; x < thumbnailWidth; x++ {
|
||||
srcX := int(float64(x) / scale)
|
||||
srcY := int(float64(y) / scale)
|
||||
thumbnail.Set(x, y, img.At(srcX, srcY))
|
||||
}
|
||||
}
|
||||
|
||||
return thumbnail
|
||||
}
|
||||
|
||||
// 如果只指定了宽度或高度,则根据最大不超过的规则生成缩略图
|
||||
if maxWidth > 0 {
|
||||
thumbnailWidth := maxWidth
|
||||
thumbnailHeight := int(float64(imgHeight) * scaleWidth)
|
||||
|
||||
// 使用"image"库的Resample方法生成缩略图
|
||||
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
||||
for y := 0; y < thumbnailHeight; y++ {
|
||||
for x := 0; x < thumbnailWidth; x++ {
|
||||
srcX := int(float64(x) / scaleWidth)
|
||||
srcY := int(float64(y) / scaleWidth)
|
||||
thumbnail.Set(x, y, img.At(srcX, srcY))
|
||||
}
|
||||
}
|
||||
|
||||
return thumbnail
|
||||
}
|
||||
|
||||
if maxHeight > 0 {
|
||||
thumbnailWidth := int(float64(imgWidth) * scaleHeight)
|
||||
thumbnailHeight := maxHeight
|
||||
|
||||
// 使用"image"库的Resample方法生成缩略图
|
||||
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
||||
for y := 0; y < thumbnailHeight; y++ {
|
||||
for x := 0; x < thumbnailWidth; x++ {
|
||||
srcX := int(float64(x) / scaleHeight)
|
||||
srcY := int(float64(y) / scaleHeight)
|
||||
thumbnail.Set(x, y, img.At(srcX, srcY))
|
||||
}
|
||||
}
|
||||
|
||||
return thumbnail
|
||||
}
|
||||
|
||||
// 默认情况下,返回原始图片
|
||||
return img
|
||||
}
|
||||
@ -15,16 +15,28 @@
|
||||
package minio
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"image"
|
||||
"image/gif"
|
||||
"image/jpeg"
|
||||
"image/png"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/minio/minio-go/v7/pkg/signer"
|
||||
@ -43,6 +55,13 @@ const (
|
||||
maxNumSize = 10000
|
||||
)
|
||||
|
||||
const (
|
||||
maxImageWidth = 1024
|
||||
maxImageHeight = 1024
|
||||
maxImageSize = 1024 * 1024 * 50
|
||||
pathInfo = "openim/thumbnail"
|
||||
)
|
||||
|
||||
func NewMinio() (s3.Interface, error) {
|
||||
conf := config.Config.Object.Minio
|
||||
u, err := url.Parse(conf.Endpoint)
|
||||
@ -60,11 +79,26 @@ func NewMinio() (s3.Interface, error) {
|
||||
m := &Minio{
|
||||
bucket: conf.Bucket,
|
||||
bucketURL: conf.Endpoint + "/" + conf.Bucket + "/",
|
||||
opts: opts,
|
||||
core: &minio.Core{Client: client},
|
||||
lock: &sync.Mutex{},
|
||||
init: false,
|
||||
}
|
||||
if conf.SignEndpoint == "" || conf.SignEndpoint == conf.Endpoint {
|
||||
m.sign = m.core.Client
|
||||
} else {
|
||||
su, err := url.Parse(conf.SignEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.opts = &minio.Options{
|
||||
Creds: credentials.NewStaticV4(conf.AccessKeyID, conf.SecretAccessKey, conf.SessionToken),
|
||||
Secure: su.Scheme == "https",
|
||||
}
|
||||
m.sign, err = minio.New(su.Host, m.opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := m.initMinio(ctx); err != nil {
|
||||
@ -76,8 +110,10 @@ func NewMinio() (s3.Interface, error) {
|
||||
type Minio struct {
|
||||
bucket string
|
||||
bucketURL string
|
||||
location string
|
||||
opts *minio.Options
|
||||
core *minio.Core
|
||||
sign *minio.Client
|
||||
lock sync.Locker
|
||||
init bool
|
||||
}
|
||||
@ -91,15 +127,43 @@ func (m *Minio) initMinio(ctx context.Context) error {
|
||||
if m.init {
|
||||
return nil
|
||||
}
|
||||
exists, err := m.core.Client.BucketExists(ctx, config.Config.Object.Minio.Bucket)
|
||||
conf := config.Config.Object.Minio
|
||||
exists, err := m.core.Client.BucketExists(ctx, conf.Bucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check bucket exists error: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
if err := m.core.Client.MakeBucket(ctx, config.Config.Object.Minio.Bucket, minio.MakeBucketOptions{}); err != nil {
|
||||
if err := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil {
|
||||
return fmt.Errorf("make bucket error: %w", err)
|
||||
}
|
||||
}
|
||||
m.location, err = m.core.Client.GetBucketLocation(ctx, conf.Bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func() {
|
||||
if conf.SignEndpoint == "" || conf.SignEndpoint == conf.Endpoint {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
m.sign = m.core.Client
|
||||
log.ZWarn(
|
||||
context.Background(),
|
||||
"set sign bucket location cache panic",
|
||||
errors.New("failed to get private field value"),
|
||||
"recover",
|
||||
fmt.Sprintf("%+v", r),
|
||||
"development version",
|
||||
"github.com/minio/minio-go/v7 v7.0.61",
|
||||
)
|
||||
}
|
||||
}()
|
||||
blc := reflect.ValueOf(m.sign).Elem().FieldByName("bucketLocCache")
|
||||
vblc := reflect.New(reflect.PtrTo(blc.Type()))
|
||||
*(*unsafe.Pointer)(vblc.UnsafePointer()) = unsafe.Pointer(blc.UnsafeAddr())
|
||||
vblc.Elem().Elem().Interface().(interface{ Set(string, string) }).Set(conf.Bucket, m.location)
|
||||
}()
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@ -191,7 +255,7 @@ func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expi
|
||||
return nil, err
|
||||
}
|
||||
request.Header.Set("X-Amz-Content-Sha256", unsignedPayload)
|
||||
request = signer.SignV4Trailer(*request, creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken, "us-east-1", nil)
|
||||
request = signer.SignV4Trailer(*request, creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken, m.location, nil)
|
||||
result.Parts[i] = s3.SignPart{
|
||||
PartNumber: partNumber,
|
||||
URL: request.URL.String(),
|
||||
@ -206,7 +270,7 @@ func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time
|
||||
if err := m.initMinio(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
rawURL, err := m.core.Client.PresignedPutObject(ctx, m.bucket, name, expire)
|
||||
rawURL, err := m.sign.PresignedPutObject(ctx, m.bucket, name, expire)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -303,6 +367,19 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *Minio) presignedGetObject(ctx context.Context, name string, expire time.Duration, query url.Values) (string, error) {
|
||||
if expire <= 0 {
|
||||
expire = time.Hour * 24 * 365 * 99 // 99 years
|
||||
} else if expire < time.Second {
|
||||
expire = time.Second
|
||||
}
|
||||
rawURL, err := m.sign.PresignedGetObject(ctx, m.bucket, name, expire, query)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return rawURL.String(), nil
|
||||
}
|
||||
|
||||
func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
|
||||
if err := m.initMinio(ctx); err != nil {
|
||||
return "", err
|
||||
@ -313,17 +390,123 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration
|
||||
reqParams.Set("response-content-type", opt.ContentType)
|
||||
}
|
||||
if opt.Filename != "" {
|
||||
reqParams.Set("response-content-disposition", `attachment; filename="`+opt.Filename+`"`)
|
||||
reqParams.Set("response-content-disposition", `attachment; filename=`+strconv.Quote(opt.Filename))
|
||||
}
|
||||
}
|
||||
if expire <= 0 {
|
||||
expire = time.Hour * 24 * 365 * 99 // 99 years
|
||||
} else if expire < time.Second {
|
||||
expire = time.Second
|
||||
if opt.Image == nil || (opt.Image.Width < 0 && opt.Image.Height < 0 && opt.Image.Format == "") || (opt.Image.Width > maxImageWidth || opt.Image.Height > maxImageHeight) {
|
||||
return m.presignedGetObject(ctx, name, expire, reqParams)
|
||||
}
|
||||
u, err := m.core.Client.PresignedGetObject(ctx, m.bucket, name, expire, reqParams)
|
||||
fileInfo, err := m.StatObject(ctx, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return u.String(), nil
|
||||
if fileInfo.Size > maxImageSize {
|
||||
return "", errors.New("file size too large")
|
||||
}
|
||||
objectInfoPath := path.Join(pathInfo, fileInfo.ETag, "image.json")
|
||||
var (
|
||||
img image.Image
|
||||
info minioImageInfo
|
||||
)
|
||||
data, err := m.getObjectData(ctx, objectInfoPath, 1024)
|
||||
if err == nil {
|
||||
if err := json.Unmarshal(data, &info); err != nil {
|
||||
return "", fmt.Errorf("unmarshal minio image info.json error: %w", err)
|
||||
}
|
||||
if info.NotImage {
|
||||
return "", errors.New("not image")
|
||||
}
|
||||
} else if m.IsNotFound(err) {
|
||||
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer reader.Close()
|
||||
imageInfo, format, err := ImageStat(reader)
|
||||
if err == nil {
|
||||
info.NotImage = false
|
||||
info.Format = format
|
||||
info.Width, info.Height = ImageWidthHeight(imageInfo)
|
||||
img = imageInfo
|
||||
} else {
|
||||
info.NotImage = true
|
||||
}
|
||||
data, err := json.Marshal(&info)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if _, err := m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
if opt.Image.Width > info.Width || opt.Image.Width <= 0 {
|
||||
opt.Image.Width = info.Width
|
||||
}
|
||||
if opt.Image.Height > info.Height || opt.Image.Height <= 0 {
|
||||
opt.Image.Height = info.Height
|
||||
}
|
||||
opt.Image.Format = strings.ToLower(opt.Image.Format)
|
||||
if opt.Image.Format == formatJpg {
|
||||
opt.Image.Format = formatJpeg
|
||||
}
|
||||
switch opt.Image.Format {
|
||||
case formatPng:
|
||||
case formatJpeg:
|
||||
case formatGif:
|
||||
default:
|
||||
if info.Format == formatGif {
|
||||
opt.Image.Format = formatGif
|
||||
} else {
|
||||
opt.Image.Format = formatJpeg
|
||||
}
|
||||
}
|
||||
reqParams.Set("response-content-type", "image/"+opt.Image.Format)
|
||||
if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format {
|
||||
return m.presignedGetObject(ctx, name, expire, reqParams)
|
||||
}
|
||||
cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format))
|
||||
if _, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{}); err == nil {
|
||||
return m.presignedGetObject(ctx, cacheKey, expire, reqParams)
|
||||
} else if !m.IsNotFound(err) {
|
||||
return "", err
|
||||
}
|
||||
if img == nil {
|
||||
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer reader.Close()
|
||||
img, _, err = ImageStat(reader)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
thumbnail := resizeImage(img, opt.Image.Width, opt.Image.Height)
|
||||
buf := bytes.NewBuffer(nil)
|
||||
switch opt.Image.Format {
|
||||
case formatPng:
|
||||
err = png.Encode(buf, thumbnail)
|
||||
case formatJpeg:
|
||||
err = jpeg.Encode(buf, thumbnail, nil)
|
||||
case formatGif:
|
||||
err = gif.Encode(buf, thumbnail, nil)
|
||||
}
|
||||
if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return m.presignedGetObject(ctx, cacheKey, expire, reqParams)
|
||||
}
|
||||
|
||||
func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]byte, error) {
|
||||
object, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer object.Close()
|
||||
if limit < 0 {
|
||||
return io.ReadAll(object)
|
||||
}
|
||||
return io.ReadAll(io.LimitReader(object, 1024))
|
||||
}
|
||||
|
||||
8
pkg/common/db/s3/minio/struct.go
Normal file
8
pkg/common/db/s3/minio/struct.go
Normal file
@ -0,0 +1,8 @@
|
||||
package minio
|
||||
|
||||
type minioImageInfo struct {
|
||||
NotImage bool `json:"notImage,omitempty"`
|
||||
Width int `json:"width,omitempty"`
|
||||
Height int `json:"height,omitempty"`
|
||||
Format string `json:"format,omitempty"`
|
||||
}
|
||||
@ -36,6 +36,19 @@ const (
|
||||
maxNumSize = 10000
|
||||
)
|
||||
|
||||
const (
|
||||
imagePng = "png"
|
||||
imageJpg = "jpg"
|
||||
imageJpeg = "jpeg"
|
||||
imageGif = "gif"
|
||||
imageWebp = "webp"
|
||||
)
|
||||
|
||||
const (
|
||||
videoSnapshotImagePng = "png"
|
||||
videoSnapshotImageJpg = "jpg"
|
||||
)
|
||||
|
||||
func NewOSS() (s3.Interface, error) {
|
||||
conf := config.Config.Object.Oss
|
||||
if conf.BucketURL == "" {
|
||||
@ -139,7 +152,7 @@ func (o *OSS) AuthSign(ctx context.Context, uploadID string, name string, expire
|
||||
}
|
||||
for i, partNumber := range partNumbers {
|
||||
rawURL := fmt.Sprintf(`%s%s?partNumber=%d&uploadId=%s`, o.bucketURL, name, partNumber, uploadID)
|
||||
request, err := http.NewRequestWithContext(ctx, http.MethodPut, rawURL, nil)
|
||||
request, err := http.NewRequest(http.MethodPut, rawURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -150,12 +163,7 @@ func (o *OSS) AuthSign(ctx context.Context, uploadID string, name string, expire
|
||||
request.Header.Set(oss.HTTPHeaderHost, request.Host)
|
||||
request.Header.Set(oss.HTTPHeaderDate, now)
|
||||
request.Header.Set(oss.HttpHeaderOssDate, now)
|
||||
authorization := fmt.Sprintf(
|
||||
`OSS %s:%s`,
|
||||
o.credentials.GetAccessKeyID(),
|
||||
o.getSignedStr(request, fmt.Sprintf(`/%s/%s?partNumber=%d&uploadId=%s`, o.bucket.BucketName, name, partNumber, uploadID), o.credentials.GetAccessKeySecret()),
|
||||
)
|
||||
request.Header.Set(oss.HTTPHeaderAuthorization, authorization)
|
||||
ossSignHeader(o.bucket.Client.Conn, request, fmt.Sprintf(`/%s/%s?partNumber=%d&uploadId=%s`, o.bucket.BucketName, name, partNumber, uploadID))
|
||||
delete(request.Header, oss.HTTPHeaderDate)
|
||||
result.Parts[i] = s3.SignPart{
|
||||
PartNumber: partNumber,
|
||||
@ -266,11 +274,36 @@ func (o *OSS) ListUploadedParts(ctx context.Context, uploadID string, name strin
|
||||
func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
|
||||
var opts []oss.Option
|
||||
if opt != nil {
|
||||
if opt.Image != nil {
|
||||
// 文档地址: https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spm=a2c4g.11186623.0.0.4b3b1e4fWW6yji
|
||||
var format string
|
||||
switch opt.Image.Format {
|
||||
case
|
||||
imagePng,
|
||||
imageJpg,
|
||||
imageJpeg,
|
||||
imageGif,
|
||||
imageWebp:
|
||||
format = opt.Image.Format
|
||||
default:
|
||||
opt.Image.Format = imageJpg
|
||||
}
|
||||
// https://oss-console-img-demo-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/example.jpg?x-oss-process=image/resize,h_100,m_lfit
|
||||
process := "image/resize,m_lfit"
|
||||
if opt.Image.Width > 0 {
|
||||
process += ",w_" + strconv.Itoa(opt.Image.Width)
|
||||
}
|
||||
if opt.Image.Height > 0 {
|
||||
process += ",h_" + strconv.Itoa(opt.Image.Height)
|
||||
}
|
||||
process += ",format," + format
|
||||
opts = append(opts, oss.Process(process))
|
||||
}
|
||||
if opt.ContentType != "" {
|
||||
opts = append(opts, oss.ResponseContentType(opt.ContentType))
|
||||
}
|
||||
if opt.Filename != "" {
|
||||
opts = append(opts, oss.ResponseContentDisposition(`attachment; filename="`+opt.Filename+`"`))
|
||||
opts = append(opts, oss.ResponseContentDisposition(`attachment; filename=`+strconv.Quote(opt.Filename)))
|
||||
}
|
||||
}
|
||||
if expire <= 0 {
|
||||
|
||||
@ -1,96 +1,11 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package oss
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"hash"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
_ "unsafe"
|
||||
|
||||
"github.com/aliyun/aliyun-oss-go-sdk/oss"
|
||||
)
|
||||
|
||||
func (o *OSS) getAdditionalHeaderKeys(req *http.Request) ([]string, map[string]string) {
|
||||
var keysList []string
|
||||
keysMap := make(map[string]string)
|
||||
srcKeys := make(map[string]string)
|
||||
|
||||
for k := range req.Header {
|
||||
srcKeys[strings.ToLower(k)] = ""
|
||||
}
|
||||
|
||||
for _, v := range o.bucket.Client.Config.AdditionalHeaders {
|
||||
if _, ok := srcKeys[strings.ToLower(v)]; ok {
|
||||
keysMap[strings.ToLower(v)] = ""
|
||||
}
|
||||
}
|
||||
|
||||
for k := range keysMap {
|
||||
keysList = append(keysList, k)
|
||||
}
|
||||
sort.Strings(keysList)
|
||||
return keysList, keysMap
|
||||
}
|
||||
|
||||
func (o *OSS) getSignedStr(req *http.Request, canonicalizedResource string, keySecret string) string {
|
||||
// Find out the "x-oss-"'s address in header of the request
|
||||
ossHeadersMap := make(map[string]string)
|
||||
additionalList, additionalMap := o.getAdditionalHeaderKeys(req)
|
||||
for k, v := range req.Header {
|
||||
if strings.HasPrefix(strings.ToLower(k), "x-oss-") {
|
||||
ossHeadersMap[strings.ToLower(k)] = v[0]
|
||||
} else if o.bucket.Client.Config.AuthVersion == oss.AuthV2 {
|
||||
if _, ok := additionalMap[strings.ToLower(k)]; ok {
|
||||
ossHeadersMap[strings.ToLower(k)] = v[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
hs := newHeaderSorter(ossHeadersMap)
|
||||
|
||||
// Sort the ossHeadersMap by the ascending order
|
||||
hs.Sort()
|
||||
|
||||
// Get the canonicalizedOSSHeaders
|
||||
canonicalizedOSSHeaders := ""
|
||||
for i := range hs.Keys {
|
||||
canonicalizedOSSHeaders += hs.Keys[i] + ":" + hs.Vals[i] + "\n"
|
||||
}
|
||||
|
||||
// Give other parameters values
|
||||
// when sign URL, date is expires
|
||||
date := req.Header.Get(oss.HTTPHeaderDate)
|
||||
contentType := req.Header.Get(oss.HTTPHeaderContentType)
|
||||
contentMd5 := req.Header.Get(oss.HTTPHeaderContentMD5)
|
||||
|
||||
// default is v1 signature
|
||||
signStr := req.Method + "\n" + contentMd5 + "\n" + contentType + "\n" + date + "\n" + canonicalizedOSSHeaders + canonicalizedResource
|
||||
h := hmac.New(func() hash.Hash { return sha1.New() }, []byte(keySecret))
|
||||
|
||||
// v2 signature
|
||||
if o.bucket.Client.Config.AuthVersion == oss.AuthV2 {
|
||||
signStr = req.Method + "\n" + contentMd5 + "\n" + contentType + "\n" + date + "\n" + canonicalizedOSSHeaders + strings.Join(additionalList, ";") + "\n" + canonicalizedResource
|
||||
h = hmac.New(func() hash.Hash { return sha256.New() }, []byte(keySecret))
|
||||
}
|
||||
_, _ = io.WriteString(h, signStr)
|
||||
signedStr := base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||
|
||||
return signedStr
|
||||
}
|
||||
//go:linkname ossSignHeader github.com/aliyun/aliyun-oss-go-sdk/oss.(*Conn).signHeader
|
||||
func ossSignHeader(c *oss.Conn, req *http.Request, canonicalizedResource string)
|
||||
|
||||
@ -1,61 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package oss
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// headerSorter defines the key-value structure for storing the sorted data in signHeader.
|
||||
type headerSorter struct {
|
||||
Keys []string
|
||||
Vals []string
|
||||
}
|
||||
|
||||
// newHeaderSorter is an additional function for function SignHeader.
|
||||
func newHeaderSorter(m map[string]string) *headerSorter {
|
||||
hs := &headerSorter{
|
||||
Keys: make([]string, 0, len(m)),
|
||||
Vals: make([]string, 0, len(m)),
|
||||
}
|
||||
|
||||
for k, v := range m {
|
||||
hs.Keys = append(hs.Keys, k)
|
||||
hs.Vals = append(hs.Vals, v)
|
||||
}
|
||||
return hs
|
||||
}
|
||||
|
||||
// Sort is an additional function for function SignHeader.
|
||||
func (hs *headerSorter) Sort() {
|
||||
sort.Sort(hs)
|
||||
}
|
||||
|
||||
// Len is an additional function for function SignHeader.
|
||||
func (hs *headerSorter) Len() int {
|
||||
return len(hs.Vals)
|
||||
}
|
||||
|
||||
// Less is an additional function for function SignHeader.
|
||||
func (hs *headerSorter) Less(i, j int) bool {
|
||||
return bytes.Compare([]byte(hs.Keys[i]), []byte(hs.Keys[j])) < 0
|
||||
}
|
||||
|
||||
// Swap is an additional function for function SignHeader.
|
||||
func (hs *headerSorter) Swap(i, j int) {
|
||||
hs.Vals[i], hs.Vals[j] = hs.Vals[j], hs.Vals[i]
|
||||
hs.Keys[i], hs.Keys[j] = hs.Keys[j], hs.Keys[i]
|
||||
}
|
||||
@ -116,9 +116,16 @@ type ListUploadedPartsResult struct {
|
||||
UploadedParts []UploadedPart `xml:"Part"`
|
||||
}
|
||||
|
||||
type Image struct {
|
||||
Format string `json:"format"`
|
||||
Width int `json:"width"`
|
||||
Height int `json:"height"`
|
||||
}
|
||||
|
||||
type AccessURLOption struct {
|
||||
ContentType string `json:"contentType"`
|
||||
Filename string `json:"filename"`
|
||||
Image *Image `json:"image"`
|
||||
}
|
||||
|
||||
type Interface interface {
|
||||
|
||||
@ -27,7 +27,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
singleGocMsgNum = 5000
|
||||
singleGocMsgNum = 100
|
||||
singleGocMsgNum5000 = 5000
|
||||
Msg = "msg"
|
||||
OldestList = 0
|
||||
NewestList = -1
|
||||
@ -128,6 +129,7 @@ type MsgDocModelInterface interface {
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
}
|
||||
|
||||
func (MsgDocModel) TableName() string {
|
||||
@ -138,6 +140,10 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 {
|
||||
return singleGocMsgNum
|
||||
}
|
||||
|
||||
func (MsgDocModel) GetSingleGocMsgNum5000() int64 {
|
||||
return singleGocMsgNum5000
|
||||
}
|
||||
|
||||
func (m *MsgDocModel) IsFull() bool {
|
||||
return m.Msg[len(m.Msg)-1].Msg != nil
|
||||
}
|
||||
|
||||
@ -1073,11 +1073,6 @@ func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessa
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
for _, msg1 := range msgs {
|
||||
if msg1.IsRead {
|
||||
msg1.Msg.IsRead = true
|
||||
}
|
||||
}
|
||||
return total, msgs, nil
|
||||
}
|
||||
|
||||
@ -1151,13 +1146,22 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
|
||||
{"doc_id", 1},
|
||||
}},
|
||||
},
|
||||
{
|
||||
{"$unwind", bson.M{"path": "$msgs"}},
|
||||
},
|
||||
{
|
||||
{"$sort", bson.M{"msgs.msg.send_time": -1}},
|
||||
},
|
||||
}
|
||||
cursor, err := m.MsgCollection.Aggregate(ctx, pipe)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
var msgsDocs []table.MsgDocModel
|
||||
type docModel struct {
|
||||
DocID string `bson:"doc_id"`
|
||||
Msg *table.MsgInfoModel `bson:"msgs"`
|
||||
}
|
||||
var msgsDocs []docModel
|
||||
err = cursor.All(ctx, &msgsDocs)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
@ -1167,24 +1171,23 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
|
||||
}
|
||||
msgs := make([]*table.MsgInfoModel, 0)
|
||||
for index := range msgsDocs {
|
||||
for i := range msgsDocs[index].Msg {
|
||||
msg := msgsDocs[index].Msg[i]
|
||||
if msg == nil || msg.Msg == nil {
|
||||
msgInfo := msgsDocs[index].Msg
|
||||
if msgInfo == nil || msgInfo.Msg == nil {
|
||||
continue
|
||||
}
|
||||
if msg.Revoke != nil {
|
||||
if msgInfo.Revoke != nil {
|
||||
revokeContent := sdkws.MessageRevokedContent{
|
||||
RevokerID: msg.Revoke.UserID,
|
||||
RevokerRole: msg.Revoke.Role,
|
||||
ClientMsgID: msg.Msg.ClientMsgID,
|
||||
RevokerNickname: msg.Revoke.Nickname,
|
||||
RevokeTime: msg.Revoke.Time,
|
||||
SourceMessageSendTime: msg.Msg.SendTime,
|
||||
SourceMessageSendID: msg.Msg.SendID,
|
||||
SourceMessageSenderNickname: msg.Msg.SenderNickname,
|
||||
SessionType: msg.Msg.SessionType,
|
||||
Seq: msg.Msg.Seq,
|
||||
Ex: msg.Msg.Ex,
|
||||
RevokerID: msgInfo.Revoke.UserID,
|
||||
RevokerRole: msgInfo.Revoke.Role,
|
||||
ClientMsgID: msgInfo.Msg.ClientMsgID,
|
||||
RevokerNickname: msgInfo.Revoke.Nickname,
|
||||
RevokeTime: msgInfo.Revoke.Time,
|
||||
SourceMessageSendTime: msgInfo.Msg.SendTime,
|
||||
SourceMessageSendID: msgInfo.Msg.SendID,
|
||||
SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
|
||||
SessionType: msgInfo.Msg.SessionType,
|
||||
Seq: msgInfo.Msg.Seq,
|
||||
Ex: msgInfo.Msg.Ex,
|
||||
}
|
||||
data, err := json.Marshal(&revokeContent)
|
||||
if err != nil {
|
||||
@ -1197,11 +1200,10 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
msg.Msg.ContentType = constant.MsgRevokeNotification
|
||||
msg.Msg.Content = string(content)
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
msgInfo.Msg.ContentType = constant.MsgRevokeNotification
|
||||
msgInfo.Msg.Content = string(content)
|
||||
}
|
||||
msgs = append(msgs, msgInfo)
|
||||
}
|
||||
start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
|
||||
n := int32(len(msgs))
|
||||
|
||||
67
pkg/common/db/unrelation/msg_convert.go
Normal file
67
pkg/common/db/unrelation/msg_convert.go
Normal file
@ -0,0 +1,67 @@
|
||||
package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
||||
for _, conversationID := range conversationIDs {
|
||||
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
var msgDocs []table.MsgDocModel
|
||||
err = cursor.All(ctx, &msgDocs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
if len(msgDocs) < 1 {
|
||||
continue
|
||||
}
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
|
||||
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
|
||||
if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil {
|
||||
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
var newMsgDocs []interface{}
|
||||
for _, msgDoc := range msgDocs {
|
||||
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
|
||||
continue
|
||||
}
|
||||
var index int64
|
||||
for index < int64(len(msgDoc.Msg)) {
|
||||
msg := msgDoc.Msg[index]
|
||||
if msg != nil && msg.Msg != nil {
|
||||
msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
|
||||
end := index + m.model.GetSingleGocMsgNum()
|
||||
if int(end) >= len(msgDoc.Msg) {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:]
|
||||
} else {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:end]
|
||||
}
|
||||
newMsgDocs = append(newMsgDocs, msgDocModel)
|
||||
index = end
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err = m.MsgCollection.InsertMany(ctx, newMsgDocs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
} else {
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -16,12 +16,14 @@ package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
)
|
||||
|
||||
// prefixes and suffixes.
|
||||
|
||||
@ -23,7 +23,7 @@ import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func GetNotificationConversationID(msg *sdkws.MsgData) string {
|
||||
func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string {
|
||||
switch msg.SessionType {
|
||||
case constant.SingleChatType:
|
||||
l := []string{msg.SendID, msg.RecvID}
|
||||
@ -114,6 +114,30 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func GetNotificationConversationIDByConversationID(conversationID string) string {
|
||||
l := strings.Split(conversationID, "_")
|
||||
if len(l) > 1 {
|
||||
l[0] = "n"
|
||||
return strings.Join(l, "_")
|
||||
} else {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func GetNotificationConversationID(sessionType int, ids ...string) string {
|
||||
sort.Strings(ids)
|
||||
if len(ids) > 2 || len(ids) < 1 {
|
||||
return ""
|
||||
}
|
||||
switch sessionType {
|
||||
case constant.SingleChatType:
|
||||
return "n_" + strings.Join(ids, "_") // single chat
|
||||
case constant.SuperGroupChatType:
|
||||
return "n_" + ids[0] // super group chat
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func IsNotification(conversationID string) bool {
|
||||
return strings.HasPrefix(conversationID, "n_")
|
||||
}
|
||||
|
||||
@ -220,7 +220,13 @@ func (g *GroupNotificationSender) getUsersInfoMap(ctx context.Context, userIDs [
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) error {
|
||||
func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if opUser == nil {
|
||||
return errs.ErrInternalServer.Wrap("**sdkws.GroupMemberFullInfo is nil")
|
||||
}
|
||||
@ -260,6 +266,12 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -267,6 +279,12 @@ func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context,
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -274,6 +292,12 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context,
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -281,6 +305,12 @@ func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Conte
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -288,6 +318,12 @@ func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx conte
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbGroup.JoinGroupReq) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -355,6 +391,12 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbGroup.GroupApplicationResponseReq) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -377,6 +419,12 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbGroup.TransferGroupOwnerReq) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -394,6 +442,12 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -401,6 +455,12 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -419,12 +479,18 @@ func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context,
|
||||
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, req *pbGroup.GroupApplicationResponseReq) (err error) {
|
||||
group, err := g.getGroupInfo(ctx, req.GroupID)
|
||||
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
user, err := g.getGroupMember(ctx, req.GroupID, req.FromUserID)
|
||||
user, err := g.getGroupMember(ctx, groupID, entrantUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -433,6 +499,12 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, r
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -440,6 +512,12 @@ func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -459,6 +537,12 @@ func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Conte
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -475,6 +559,12 @@ func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -494,6 +584,12 @@ func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, gr
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -513,6 +609,12 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -529,6 +631,12 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -545,6 +653,12 @@ func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -560,25 +674,6 @@ func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx c
|
||||
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips)
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) MemberEnterDirectlyNotification(ctx context.Context, groupID string, entrantUserID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.ZError(ctx, utils.GetFuncName(1)+" failed", err)
|
||||
}
|
||||
}()
|
||||
group, err := g.getGroupInfo(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
user, err := g.getGroupMember(ctx, groupID, entrantUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
|
||||
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
|
||||
}
|
||||
|
||||
func (g *GroupNotificationSender) SuperGroupNotification(ctx context.Context, sendID, recvID string) (err error) {
|
||||
defer log.ZDebug(ctx, "return")
|
||||
defer func() {
|
||||
|
||||
12
scripts/start.bat
Normal file
12
scripts/start.bat
Normal file
@ -0,0 +1,12 @@
|
||||
cd %~p0../_output/bin/platforms/windows
|
||||
start api.exe -p 10002
|
||||
start auth.exe -p 10060
|
||||
start conversation.exe -p 10080
|
||||
start friend.exe -p 10020
|
||||
start group.exe -p 10050
|
||||
start msg.exe -p 10030
|
||||
start msggateway.exe -p 10040 -w 10001
|
||||
start msgtransfer.exe
|
||||
start third.exe -p 10090
|
||||
start push.exe -p 10070
|
||||
start user.exe -p 10010
|
||||
@ -1,5 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -24,12 +23,13 @@ source $OPENIM_ROOT/scripts/path_info.sh
|
||||
|
||||
bin_dir="$BIN_DIR"
|
||||
logs_dir="$OPENIM_ROOT/logs"
|
||||
sdk_db_dir="$OPENIM_ROOT/sdk/db/"
|
||||
|
||||
cd "$SCRIPTS_ROOT"
|
||||
|
||||
for i in ${service_names[*]}; do
|
||||
#Check whether the service exists
|
||||
name="ps |grep -w $i |grep -v grep"
|
||||
name="ps -aux |grep -w $i |grep -v grep"
|
||||
count="${name}| wc -l"
|
||||
if [ $(eval ${count}) -gt 0 ]; then
|
||||
pid="${name}| awk '{print \$2}'"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user