Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
Gordon 2023-02-16 16:33:04 +08:00
commit 1f7d2d73c7
71 changed files with 1415 additions and 882 deletions

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
_ "Open_IM/cmd/open_im_api/docs"
apiAuth "Open_IM/internal/api/auth" apiAuth "Open_IM/internal/api/auth"
"Open_IM/internal/api/conversation" "Open_IM/internal/api/conversation"
"Open_IM/internal/api/friend" "Open_IM/internal/api/friend"
@ -14,7 +13,6 @@ import (
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware" "Open_IM/pkg/common/middleware"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"
"flag" "flag"
"fmt" "fmt"
@ -28,7 +26,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
//"syscall" //"syscall"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
) )
// @title open-IM-Server API // @title open-IM-Server API

View File

@ -4,7 +4,7 @@ import (
rpcConversation "Open_IM/internal/rpc/conversation" rpcConversation "Open_IM/internal/rpc/conversation"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
"flag" "flag"
"fmt" "fmt"
) )

1
go.mod
View File

@ -56,7 +56,6 @@ require (
) )
require ( require (
github.com/envoyproxy/protoc-gen-validate v0.1.0
github.com/go-openapi/spec v0.20.6 // indirect github.com/go-openapi/spec v0.20.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect

1
go.sum
View File

@ -510,7 +510,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=

View File

@ -1,7 +1,7 @@
package apiAuth package apiAuth
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"

View File

@ -1,10 +1,9 @@
package conversation package conversation
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user" pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -2,7 +2,7 @@ package friend
//import ( //import (
// jsonData "Open_IM/internal/utils" // jsonData "Open_IM/internal/utils"
// api "Open_IM/pkg/api_struct" // api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config" // "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/log" // "Open_IM/pkg/common/log"
// "Open_IM/pkg/common/tokenverify" // "Open_IM/pkg/common/tokenverify"

View File

@ -1,8 +1,8 @@
package friend package friend
import ( import (
common "Open_IM/internal/api_to_rpc" common "Open_IM/internal/api2rpc"
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
rpc "Open_IM/pkg/proto/friend" rpc "Open_IM/pkg/proto/friend"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"

View File

@ -2,7 +2,7 @@ package group
//import ( //import (
// common "Open_IM/internal/api_to_rpc" // common "Open_IM/internal/api_to_rpc"
// api "Open_IM/pkg/api_struct" // api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config" // "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/constant" // "Open_IM/pkg/common/constant"
// "Open_IM/pkg/common/log" // "Open_IM/pkg/common/log"

View File

@ -1,7 +1,7 @@
package group package group
import ( import (
"Open_IM/pkg/api_struct" "Open_IM/pkg/apistruct"
"Open_IM/pkg/proto/group" "Open_IM/pkg/proto/group"
"context" "context"
"errors" "errors"

View File

@ -2,7 +2,7 @@ package group
//import ( //import (
// jsonData "Open_IM/internal/utils" // jsonData "Open_IM/internal/utils"
// api "Open_IM/pkg/api_struct" // api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config" // "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/log" // "Open_IM/pkg/common/log"
// "Open_IM/pkg/common/token_verify" // "Open_IM/pkg/common/token_verify"

View File

@ -7,10 +7,9 @@
package manage package manage
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
pbChat "Open_IM/pkg/proto/msg" pbChat "Open_IM/pkg/proto/msg"

View File

@ -7,12 +7,11 @@
package manage package manage
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
rpc "Open_IM/pkg/proto/user" rpc "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -1,12 +1,11 @@
package msg package msg
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
rpc "Open_IM/pkg/proto/msg" rpc "Open_IM/pkg/proto/msg"
pbCommon "Open_IM/pkg/proto/sdkws" pbCommon "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -1,12 +1,11 @@
package msg package msg
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
rpc "Open_IM/pkg/proto/msg" rpc "Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"

View File

@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbChat "Open_IM/pkg/proto/msg" pbChat "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws" sdkws "Open_IM/pkg/proto/sdkws"
"context" "context"

View File

@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
"Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws" sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -7,7 +7,6 @@ import (
sdkws "Open_IM/pkg/proto/sdkws" sdkws "Open_IM/pkg/proto/sdkws"
"context" "context"
"Open_IM/pkg/getcdv3"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http" "net/http"
"strings" "strings"

View File

@ -1,7 +1,7 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"

View File

@ -1,7 +1,7 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"

View File

@ -1,8 +1,7 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -1,10 +1,9 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
_ "Open_IM/pkg/common/tokenverify" _ "Open_IM/pkg/common/tokenverify"

View File

@ -1,8 +1,7 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -1,8 +1,7 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -1,7 +1,7 @@
package apiThird package apiThird
import ( import (
api "Open_IM/pkg/api_struct" api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"

View File

@ -1,17 +1,16 @@
package user package user
import ( import (
jsonData "Open_IM/internal/utils" api "Open_IM/pkg/apistruct"
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
cacheRpc "Open_IM/pkg/proto/cache"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
sdkws "Open_IM/pkg/proto/sdkws" sdkws "Open_IM/pkg/proto/sdkws"
rpc "Open_IM/pkg/proto/user" rpc "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
jsonData "Open_IM/pkg/utils"
"context" "context"
"net/http" "net/http"
"strings" "strings"

View File

@ -3,92 +3,77 @@ package cronTask
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog" "Open_IM/pkg/common/tracelog"
sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"math" "math"
"strconv"
"strings"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
) )
type SeqCheckInterface interface {
ClearAll() error type ClearMsgTool struct {
msgInterface controller.MsgInterface
userInterface controller.UserInterface
groupInterface controller.GroupInterface
} }
type ClearMsgCronTask struct { func (c *ClearMsgTool) getCronTaskOperationID() string {
msgModel controller.MsgInterface
userModel controller.UserInterface
groupModel controller.GroupInterface
cache cache.Cache
}
func (c *ClearMsgCronTask) getCronTaskOperationID() string {
return cronTaskOperationID + utils.OperationIDGenerator() return cronTaskOperationID + utils.OperationIDGenerator()
} }
func (c *ClearMsgCronTask) ClearAll() { func (c *ClearMsgTool) ClearAll() {
operationID := c.getCronTaskOperationID() operationID := c.getCronTaskOperationID()
ctx := context.Background() ctx := context.Background()
tracelog.SetOperationID(ctx, operationID) tracelog.SetOperationID(ctx, operationID)
log.NewInfo(operationID, "========================= start del cron task =========================") log.NewInfo(operationID, "============================ start del cron task ============================")
var err error var err error
userIDList, err := c.userModel.GetAllUserID(ctx) userIDList, err := c.userInterface.GetAllUserID(ctx)
if err == nil { if err == nil {
c.StartClearMsg(operationID, userIDList) c.ClearUsersMsg(ctx, userIDList)
} else { } else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
} }
// working group msg clear // working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) workingGroupIDList, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err == nil { if err == nil {
c.StartClearWorkingGroupMsg(operationID, workingGroupIDList) c.ClearSuperGroupMsg(ctx, workingGroupIDList)
} else { } else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
} }
log.NewInfo(operationID, "============================ start del cron finished ============================")
log.NewInfo(operationID, "========================= start del cron finished =========================")
} }
func (c *ClearMsgCronTask) StartClearMsg(operationID string, userIDList []string) { func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList)
for _, userID := range userIDList { for _, userID := range userIDList {
if err := DeleteUserMsgsAndSetMinSeq(operationID, userID); err != nil { if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID)
} }
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
}
}
func (c *ClearMsgCronTask) StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList)
for _, groupID := range workingGroupIDList {
userIDList, err := rocksCache.GetGroupMemberIDListFromCache(groupID)
if err != nil { if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID)
continue continue
} }
log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "workingGroupIDList:", userIDList) if
if err := DeleteUserSuperGroupMsgsAndSetMinSeq(operationID, groupID, userIDList); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
} }
} }
func checkMaxSeqWithMongo(operationID, sourceID string, diffusionType int) error { func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) {
for _, groupID := range workingGroupIDList {
userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID)
if err != nil {
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID)
continue
}
if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
}
}
func (c *ClearMsgTool) checkMaxSeqWithMongo(ctx context.Context, sourceID string, diffusionType int) error {
var seqRedis uint64 var seqRedis uint64
var err error var err error
if diffusionType == constant.WriteDiffusion { if diffusionType == constant.WriteDiffusion {

View File

@ -19,11 +19,11 @@ func StartCronTask(userID, workingGroupID string) {
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
if userID != "" { if userID != "" {
operationID := getCronTaskOperationID() operationID := getCronTaskOperationID()
StartClearMsg(operationID, []string{userID}) ClearUsersMsg(operationID, []string{userID})
} }
if workingGroupID != "" { if workingGroupID != "" {
operationID := getCronTaskOperationID() operationID := getCronTaskOperationID()
StartClearWorkingGroupMsg(operationID, []string{workingGroupID}) ClearSuperGroupMsg(operationID, []string{workingGroupID})
} }
if userID != "" || workingGroupID != "" { if userID != "" || workingGroupID != "" {
fmt.Println("clear msg finished") fmt.Println("clear msg finished")

View File

@ -2,7 +2,6 @@ package msggateway
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/msg" pbChat "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws" sdkws "Open_IM/pkg/proto/sdkws"

View File

@ -8,7 +8,7 @@ import (
"fmt" "fmt"
"sync" "sync"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
) )

View File

@ -3,7 +3,6 @@ package msggateway
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome" "Open_IM/pkg/common/prome"
pbChat "Open_IM/pkg/proto/msg" pbChat "Open_IM/pkg/proto/msg"

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"io/ioutil" "io/ioutil"
"open_im_sdk/pkg/utils"
) )
type Compressor interface { type Compressor interface {

View File

@ -3,7 +3,6 @@ package new
import ( import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"open_im_sdk/pkg/utils"
) )
type Encoder interface { type Encoder interface {

View File

@ -12,15 +12,13 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/gob" "encoding/gob"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"github.com/golang/protobuf/proto"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
) )
type RPCServer struct { type RPCServer struct {

View File

@ -3,9 +3,8 @@ package msggateway
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"

View File

@ -0,0 +1,232 @@
package objstorage
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"math/rand"
"path"
"strconv"
"time"
)
func NewController(i Interface, kv KV) (*Controller, error) {
if err := i.Init(); err != nil {
return nil, err
}
return &Controller{
i: i,
kv: kv,
}, nil
}
type Controller struct {
i Interface
//i *minioImpl
kv KV
}
func (c *Controller) key(v string) string {
return "OBJECT_STORAGE:" + c.i.Name() + ":" + v
}
func (c *Controller) putKey(v string) string {
return c.key("put:" + v)
}
func (c *Controller) pathKey(v string) string {
return c.key("path:" + v)
}
func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutAddr, error) {
if data, err := c.kv.Get(ctx, c.pathKey(args.Hash)); err == nil {
// 服务器已存在
var src BucketFile
if err := json.Unmarshal([]byte(data), &src); err != nil {
return nil, err
}
var bucket string
if args.ClearTime <= 0 {
bucket = c.i.PermanentBucket()
} else {
bucket = c.i.ClearBucket()
}
dst := &BucketFile{
Bucket: bucket,
Name: args.Name,
}
// 直接拷贝一份
err := c.i.CopyObjetInfo(ctx, &src, dst)
if err == nil {
info, err := c.i.GetObjectInfo(ctx, dst)
if err != nil {
return nil, err
}
return &PutAddr{
ResourceURL: info.URL,
}, nil
} else if !c.i.IsNotFound(err) {
return nil, err
}
} else if !c.kv.IsNotFound(err) {
return nil, err
}
// 上传逻辑
name := args.Name
effective := time.Now().Add(args.EffectiveTime)
prefix := c.Prefix(&args.PutArgs)
var pack int64
if args.FragmentSize <= 0 || args.Size <= args.FragmentSize {
pack = 1
} else {
pack = args.Size / args.FragmentSize
if args.Size%args.FragmentSize > 0 {
pack++
}
}
p := path.Join(path.Dir(args.Name), time.Now().Format("20060102"))
info := putInfo{
Bucket: c.i.UploadBucket(),
Fragments: make([]string, 0, pack),
FragmentSize: args.FragmentSize,
Name: name,
Hash: args.Hash,
Size: args.Size,
}
if args.ClearTime > 0 {
t := time.Now().Add(args.ClearTime).UnixMilli()
info.ClearTime = &t
}
putURLs := make([]string, 0, pack)
for i := int64(1); i <= pack; i++ {
name := prefix + "_" + strconv.FormatInt(i, 10) + path.Ext(args.Name)
name = path.Join(p, name)
info.Fragments = append(info.Fragments, name)
args.Name = name
put, err := c.i.ApplyPut(ctx, &ApplyPutArgs{
Bucket: info.Bucket,
Name: name,
Effective: args.EffectiveTime,
Header: args.Header,
})
if err != nil {
return nil, err
}
putURLs = append(putURLs, put.URL)
}
data, err := json.Marshal(&info)
if err != nil {
return nil, err
}
if err := c.kv.Set(ctx, c.putKey(prefix), string(data), args.EffectiveTime); err != nil {
return nil, err
}
var fragmentSize int64
if pack == 1 {
fragmentSize = args.Size
} else {
fragmentSize = args.FragmentSize
}
return &PutAddr{
PutURLs: putURLs,
FragmentSize: fragmentSize,
PutID: prefix,
EffectiveTime: effective,
}, nil
}
func (c *Controller) ConfirmPut(ctx context.Context, putID string) (*ObjectInfo, error) {
data, err := c.kv.Get(ctx, c.putKey(putID))
if err != nil {
return nil, err
}
var info putInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
return nil, err
}
var total int64
src := make([]BucketFile, len(info.Fragments))
for i, fragment := range info.Fragments {
state, err := c.i.GetObjectInfo(ctx, &BucketFile{
Bucket: info.Bucket,
Name: fragment,
})
if err != nil {
return nil, err
}
total += state.Size
src[i] = BucketFile{
Bucket: info.Bucket,
Name: fragment,
}
}
if total != info.Size {
return nil, fmt.Errorf("incomplete upload %d/%d", total, info.Size)
}
var dst *BucketFile
if info.ClearTime == nil {
dst = &BucketFile{
Bucket: c.i.PermanentBucket(),
Name: info.Name,
}
} else {
dst = &BucketFile{
Bucket: c.i.ClearBucket(),
Name: info.Name,
}
}
if err := c.i.MergeObjectInfo(ctx, src, dst); err != nil { // SourceInfo 0 is too small (2) and it is not the last part
return nil, err
}
obj, err := c.i.GetObjectInfo(ctx, dst)
if err != nil {
return nil, err
}
go func() {
err := c.kv.Del(ctx, c.putKey(putID))
if err != nil {
log.Println("del key:", err)
}
for _, b := range src {
err = c.i.DeleteObjetInfo(ctx, &b)
if err != nil {
log.Println("del obj:", err)
}
}
}()
return obj, nil
}
func (c *Controller) Prefix(args *PutArgs) string {
buf := bytes.NewBuffer(nil)
buf.WriteString(args.Name)
buf.WriteString("~~~@~@~~~")
buf.WriteString(strconv.FormatInt(args.Size, 10))
buf.WriteString(",")
buf.WriteString(args.Hash)
buf.WriteString(",")
buf.WriteString(strconv.FormatInt(int64(args.ClearTime), 10))
buf.WriteString(",")
buf.WriteString(strconv.FormatInt(int64(args.EffectiveTime), 10))
buf.WriteString(",")
buf.WriteString(c.i.Name())
r := make([]byte, 16)
rand.Read(r)
buf.Write(r)
md5v := md5.Sum(buf.Bytes())
return hex.EncodeToString(md5v[:])
}
type putInfo struct {
Bucket string
Fragments []string
FragmentSize int64
Size int64
Name string
Hash string
ClearTime *int64
}

49
internal/objstorage/kv.go Normal file
View File

@ -0,0 +1,49 @@
package objstorage
import (
"context"
"github.com/go-redis/redis/v8"
"log"
"time"
)
type KV interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, val string, expiration time.Duration) error
Del(ctx context.Context, key string) error
IsNotFound(err error) bool
}
func NewKV() KV {
rdb := redis.NewClient(&redis.Options{
Addr: "",
Username: "",
Password: "",
})
return &redisImpl{
rdb: rdb,
}
}
type redisImpl struct {
rdb *redis.Client
}
func (r *redisImpl) Del(ctx context.Context, key string) error {
log.Println("redis del", key)
return r.rdb.Del(ctx, key).Err()
}
func (r *redisImpl) Get(ctx context.Context, key string) (string, error) {
log.Println("redis get", key)
return r.rdb.Get(ctx, key).Result()
}
func (r *redisImpl) Set(ctx context.Context, key string, val string, expiration time.Duration) error {
log.Println("redis set", key, val, expiration.String())
return r.rdb.Set(ctx, key, val, expiration).Err()
}
func (r *redisImpl) IsNotFound(err error) bool {
return err == redis.Nil
}

112
internal/objstorage/main.go Normal file
View File

@ -0,0 +1,112 @@
package objstorage
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"path"
"time"
)
func HttpPut(url string, body io.Reader) error {
req, err := http.NewRequest(http.MethodPut, url, body)
if err != nil {
return err
}
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http [%s] %s", resp.Status, data)
}
if len(data) > 0 {
log.Println("[http body]", string(data))
}
return nil
}
func Md5(p []byte) string {
t := md5.Sum(p)
return hex.EncodeToString(t[:])
}
func Main() {
ctx := context.Background()
c, err := NewController(&minioImpl{}, NewKV())
if err != nil {
log.Fatalln(err)
}
name := "hello.txt"
data := []byte("hello world")
userID := "10000"
name = path.Join("user_"+userID, name)
addr, err := c.ApplyPut(ctx, &FragmentPutArgs{
PutArgs: PutArgs{
Name: name,
Size: int64(len(data)),
Hash: Md5(data),
EffectiveTime: time.Second * 60 * 60,
},
FragmentSize: 2,
})
if err != nil {
log.Fatalln(err)
}
fmt.Println()
fmt.Println()
if addr.ResourceURL != "" {
log.Println("服务器已经存在")
return
}
var (
start int
end = int(addr.FragmentSize)
)
for _, u := range addr.PutURLs {
if start >= len(data) {
break
}
if end > len(data) {
end = len(data)
}
_ = u
page := data[start:end]
fmt.Print(string(page))
start += int(addr.FragmentSize)
end += int(addr.FragmentSize)
err = HttpPut(u, bytes.NewReader(page))
if err != nil {
log.Fatalln(err)
}
}
fmt.Println()
fmt.Println()
fmt.Println("[PUT_ID]", addr.PutID)
info, err := c.ConfirmPut(ctx, addr.PutID)
if err != nil {
log.Fatalln(err)
}
log.Printf("%+v\n", info)
log.Println("success")
}

View File

@ -0,0 +1,143 @@
package objstorage
import (
"context"
"errors"
"fmt"
"github.com/minio/minio-go"
"net/url"
"time"
)
//func NewMinio() Interface {
// return &minioImpl{}
//}
type minioImpl struct {
uploadBucket string // 上传桶
permanentBucket string // 永久桶
clearBucket string // 自动清理桶
client *minio.Client
}
func (m *minioImpl) Init() error {
client, err := minio.New("127.0.0.1:9000", "minioadmin", "minioadmin", false)
if err != nil {
return fmt.Errorf("minio client error: %w", err)
}
m.client = client
m.uploadBucket = "upload"
m.permanentBucket = "permanent"
m.clearBucket = "clear"
return nil
}
func (m *minioImpl) Name() string {
return "minio"
}
func (m *minioImpl) UploadBucket() string {
return m.uploadBucket
}
func (m *minioImpl) PermanentBucket() string {
return m.permanentBucket
}
func (m *minioImpl) ClearBucket() string {
return m.clearBucket
}
func (m *minioImpl) urlReplace(u *url.URL) {
}
func (m *minioImpl) ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) {
if args.Effective <= 0 {
return nil, errors.New("EffectiveTime <= 0")
}
_, err := m.GetObjectInfo(ctx, &BucketFile{
Bucket: m.uploadBucket,
Name: args.Name,
})
if err == nil {
return nil, fmt.Errorf("minio bucket %s name %s already exists", args.Bucket, args.Name)
} else if !m.IsNotFound(err) {
return nil, err
}
effective := time.Now().Add(args.Effective)
u, err := m.client.PresignedPutObject(m.uploadBucket, args.Name, args.Effective)
if err != nil {
return nil, fmt.Errorf("minio apply error: %w", err)
}
m.urlReplace(u)
return &PutRes{
URL: u.String(),
Bucket: m.uploadBucket,
Name: args.Name,
EffectiveTime: effective,
}, nil
}
func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) {
info, err := m.client.StatObject(args.Bucket, args.Name, minio.StatObjectOptions{})
if err != nil {
return nil, err
}
return &ObjectInfo{
URL: "", // todo
Size: info.Size,
Hash: info.ETag,
}, nil
}
func (m *minioImpl) CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
if err != nil {
return err
}
return m.client.CopyObject(destination, minio.NewSourceInfo(src.Bucket, src.Name, nil))
}
func (m *minioImpl) DeleteObjetInfo(ctx context.Context, info *BucketFile) error {
return m.client.RemoveObject(info.Bucket, info.Name)
}
func (m *minioImpl) MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
if err := m.CopyObjetInfo(ctx, src, dst); err != nil {
return err
}
return m.DeleteObjetInfo(ctx, src)
}
func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error {
switch len(src) {
case 0:
return errors.New("src empty")
case 1:
return m.CopyObjetInfo(ctx, &src[0], dst)
}
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
if err != nil {
return err
}
sources := make([]minio.SourceInfo, len(src))
for i, s := range src {
sources[i] = minio.NewSourceInfo(s.Bucket, s.Name, nil)
}
return m.client.ComposeObject(destination, sources) // todo
}
func (m *minioImpl) IsNotFound(err error) bool {
if err == nil {
return false
}
switch e := err.(type) {
case minio.ErrorResponse:
return e.StatusCode == 404 && e.Code == "NoSuchKey"
case *minio.ErrorResponse:
return e.StatusCode == 404 && e.Code == "NoSuchKey"
default:
return false
}
}

18
internal/objstorage/oo.go Normal file
View File

@ -0,0 +1,18 @@
package objstorage
import "context"
type Interface interface {
Init() error
Name() string
UploadBucket() string
PermanentBucket() string
ClearBucket() string
ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error)
GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error)
CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
DeleteObjetInfo(ctx context.Context, info *BucketFile) error
MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error
IsNotFound(err error) bool
}

View File

@ -0,0 +1,69 @@
package objstorage
import (
"net/http"
"time"
)
type PutRes struct {
URL string
Bucket string
Name string
EffectiveTime time.Time
}
type FragmentPutArgs struct {
PutArgs
FragmentSize int64 // 分片大小
}
type PutArgs struct {
Name string // 文件名
Size int64 // 大小
Hash string // md5
Prefix string // 前缀
ClearTime time.Duration // 自动清理时间
EffectiveTime time.Duration // 申请有效时间
Header http.Header // header
}
type BucketFile struct {
Bucket string `json:"bucket"`
Name string `json:"name"`
}
type ObjectInfo struct {
URL string
Size int64
Hash string
}
//type PutSpace struct {
// URL string
// EffectiveTime time.Time
//}
type PutAddr struct {
ResourceURL string
PutID string
FragmentSize int64
EffectiveTime time.Time
PutURLs []string
}
type KVData struct {
Bucket string `json:"bucket"`
Name string `json:"name"`
}
type PutResp struct {
URL string
Time *time.Time
}
type ApplyPutArgs struct {
Bucket string
Name string
Effective time.Duration // 申请有效时间
Header http.Header // header
}

View File

@ -3,7 +3,6 @@ package fcm
import ( import (
"Open_IM/internal/push" "Open_IM/internal/push"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"context" "context"
go_redis "github.com/go-redis/redis/v8" go_redis "github.com/go-redis/redis/v8"

View File

@ -3,7 +3,6 @@ package getui
import ( import (
"Open_IM/internal/push" "Open_IM/internal/push"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"bytes" "bytes"

View File

@ -15,7 +15,7 @@ import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka" "Open_IM/pkg/common/kafka"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
"Open_IM/pkg/statistics" "Open_IM/pkg/statistics"
"fmt" "fmt"
) )

View File

@ -3,10 +3,9 @@ package logic
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
"Open_IM/pkg/getcdv3"
pbPush "Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"

View File

@ -10,9 +10,8 @@ import (
"Open_IM/internal/push" "Open_IM/internal/push"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/getcdv3"
pbPush "Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
pbRtc "Open_IM/pkg/proto/rtc" pbRtc "Open_IM/pkg/proto/rtc"
@ -20,7 +19,7 @@ import (
"context" "context"
"strings" "strings"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )

View File

@ -1,10 +1,7 @@
package logic package logic
import ( import (
tpns "Open_IM/internal/push/sdk/tpns-server-sdk-go/go"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/auth" "Open_IM/internal/push/sdk/tpns-server-sdk-go/go/auth"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/common"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/req"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
) )

View File

@ -8,12 +8,10 @@ import (
"Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/relation"
tableRelation "Open_IM/pkg/common/db/table/relation" tableRelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prome"
"Open_IM/pkg/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"

View File

@ -32,7 +32,7 @@ func toCommonCallback(ctx context.Context, msg *pbChat.SendMsgReq, command strin
AtUserIDList: msg.MsgData.AtUserIDList, AtUserIDList: msg.MsgData.AtUserIDList,
SenderFaceURL: msg.MsgData.SenderFaceURL, SenderFaceURL: msg.MsgData.SenderFaceURL,
Content: utils.GetContent(msg.MsgData), Content: utils.GetContent(msg.MsgData),
Seq: msg.MsgData.Seq, Seq: uint32(msg.MsgData.Seq),
Ex: msg.MsgData.Ex, Ex: msg.MsgData.Ex,
} }
} }

View File

@ -9,7 +9,7 @@ import (
func (m *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) { func (m *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) {
resp := &common.DelMsgListResp{} resp := &common.DelMsgListResp{}
if err := m.MsgInterface.DelMsgFromCache(ctx, req.UserID, req.SeqList); err != nil { if _, err := m.MsgInterface.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil {
return nil, err return nil, err
} }
DeleteMessageNotification(ctx, req.UserID, req.SeqList) DeleteMessageNotification(ctx, req.UserID, req.SeqList)
@ -21,11 +21,14 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup
if err := tokenverify.CheckAdmin(ctx); err != nil { if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err return nil, err
} }
maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID) //maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID)
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { //if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil {
// return nil, err
//}
if err := m.MsgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, req.UserID, 0); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
@ -36,8 +39,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil { if err := m.MsgInterface.CleanUpUserMsg(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
//if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil {
// return nil, err
//}
return resp, nil return resp, nil
} }

View File

@ -3,7 +3,6 @@ package msg
import ( import (
"Open_IM/internal/common/notification" "Open_IM/internal/common/notification"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws" "Open_IM/pkg/proto/sdkws"

View File

@ -48,52 +48,51 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti
MsgFirstModifyTime: setReq.MsgFirstModifyTime, MsgFirstModifyTime: setReq.MsgFirstModifyTime,
} }
resp := &cbapi.CallbackDeleteMessageReactionExtResp{} resp := &cbapi.CallbackDeleteMessageReactionExtResp{}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
return http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) return http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
} }
//func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error { func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error {
// if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
// return nil return nil
// } }
// req := cbapi.CallbackGetMessageListReactionExtReq{ req := cbapi.CallbackGetMessageListReactionExtReq{
// OperationID: getReq.OperationID, OperationID: getReq.OperationID,
// CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand, CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
// SourceID: getReq.SourceID, SourceID: getReq.SourceID,
// OpUserID: getReq.OpUserID, OpUserID: getReq.OpUserID,
// SessionType: getReq.SessionType, SessionType: getReq.SessionType,
// TypeKeyList: getReq.TypeKeyList, TypeKeyList: getReq.TypeKeyList,
// MessageKeyList: getReq.MessageReactionKeyList, MessageKeyList: getReq.MessageReactionKeyList,
// } }
// resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp} resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp}
// defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp) defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp)
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
// callbackResp.ErrCode = http2.StatusInternalServerError callbackResp.ErrCode = http2.StatusInternalServerError
// callbackResp.ErrMsg = err.Error() callbackResp.ErrMsg = err.Error()
// } }
// return resp return resp
//} }
//func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp {
// callbackResp := cbapi.CommonCallbackResp{OperationID: setReq.OperationID} func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp {
// log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) callbackResp := cbapi.CommonCallbackResp{}
// req := cbapi.CallbackAddMessageReactionExtReq{ req := cbapi.CallbackAddMessageReactionExtReq{
// OperationID: setReq.OperationID, OperationID: setReq.OperationID,
// CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand, CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
// SourceID: setReq.SourceID, SourceID: setReq.SourceID,
// OpUserID: setReq.OpUserID, OpUserID: setReq.OpUserID,
// SessionType: setReq.SessionType, SessionType: setReq.SessionType,
// ReactionExtensionList: setReq.ReactionExtensionList, ReactionExtensionList: setReq.ReactionExtensionList,
// ClientMsgID: setReq.ClientMsgID, ClientMsgID: setReq.ClientMsgID,
// IsReact: setReq.IsReact, IsReact: setReq.IsReact,
// IsExternalExtensions: setReq.IsExternalExtensions, IsExternalExtensions: setReq.IsExternalExtensions,
// MsgFirstModifyTime: setReq.MsgFirstModifyTime, MsgFirstModifyTime: setReq.MsgFirstModifyTime,
// } }
// resp := &cbapi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} resp := &cbapi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp}
// defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime) defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime)
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
// callbackResp.ErrCode = http2.StatusInternalServerError callbackResp.ErrCode = http2.StatusInternalServerError
// callbackResp.ErrMsg = err.Error() callbackResp.ErrMsg = err.Error()
// } }
// return resp return resp
//
//} }

View File

@ -1,7 +1,6 @@
package msg package msg
import ( import (
"Open_IM/pkg/common/db"
"time" "time"
) )

View File

@ -2,7 +2,7 @@ package msg
import ( import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prome"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws" "Open_IM/pkg/proto/sdkws"

View File

@ -9,7 +9,7 @@ import (
discoveryRegistry "Open_IM/pkg/discoveryregistry" discoveryRegistry "Open_IM/pkg/discoveryregistry"
"github.com/OpenIMSDK/openKeeper" "github.com/OpenIMSDK/openKeeper"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prome"
"Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

View File

@ -6,7 +6,7 @@ import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware" "Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prome"
"flag" "flag"
"fmt" "fmt"
"github.com/OpenIMSDK/openKeeper" "github.com/OpenIMSDK/openKeeper"

View File

@ -99,3 +99,26 @@ type CallbackDeleteMessageReactionExtResp struct {
ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"` ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"`
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
} }
type CallbackGetMessageListReactionExtReq struct {
OperationID string `json:"operationID"`
CallbackCommand string `json:"callbackCommand"`
SourceID string `json:"sourceID"`
OpUserID string `json:"opUserID"`
SessionType int32 `json:"sessionType"`
TypeKeyList []string `json:"typeKeyList"`
MessageKeyList []*msg.GetMessageListReactionExtensionsReq_MessageReactionKey `json:"messageKeyList"`
}
type CallbackAddMessageReactionExtReq struct {
OperationID string `json:"operationID"`
CallbackCommand string `json:"callbackCommand"`
SourceID string `json:"sourceID"`
OpUserID string `json:"opUserID"`
SessionType int32 `json:"sessionType"`
ReactionExtensionList map[string]*sdkws.KeyValue `json:"reactionExtensionList"`
ClientMsgID string `json:"clientMsgID"`
IsReact bool `json:"isReact"`
IsExternalExtensions bool `json:"isExternalExtensions"`
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
}

View File

@ -38,22 +38,24 @@ const (
) )
type Cache interface { type Cache interface {
IncrUserSeq(ctx context.Context, userID string) (uint64, error) IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (uint64, error) GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq uint64) error SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq uint64) (err error) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (uint64, error) GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq uint64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (uint64, error) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq uint64) error IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMinSeq(ctx context.Context, groupID string, minSeq uint32) error SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error) GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err error) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error CleanUpOneUserAllMsg(ctx context.Context, userID string) error
@ -61,7 +63,7 @@ type Cache interface {
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []uint32) error DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error) GetGetuiToken(ctx context.Context) (string, error)
@ -138,66 +140,66 @@ func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
} }
// Perform seq auto-increment operation of user messages // Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (uint64, error) { func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid key := userIncrSeq + uid
seq, err := r.rdb.Incr(context.Background(), key).Result() seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err return seq, err
} }
// Get the largest Seq // Get the largest Seq
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (uint64, error) { func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid key := userIncrSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result() seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err return int64(utils.StringToInt(seq)), err
} }
// set the largest Seq // set the largest Seq
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq uint64) error { func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq int64) error {
key := userIncrSeq + uid key := userIncrSeq + uid
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err() return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
} }
// Set the user's minimum seq // Set the user's minimum seq
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq uint64) (err error) { func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq int64) (err error) {
key := userMinSeq + uid key := userMinSeq + uid
return r.rdb.Set(context.Background(), key, minSeq, 0).Err() return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
} }
// Get the smallest Seq // Get the smallest Seq
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (uint64, error) { func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (int64, error) {
key := userMinSeq + uid key := userMinSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result() seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err return int64(utils.StringToInt(seq)), err
} }
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq uint64) (err error) { func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err() return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
} }
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (uint64, error) { func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := r.rdb.Get(context.Background(), key).Result() seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err return int64(utils.StringToInt(seq)), err
} }
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) { func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
seq, err := r.rdb.Get(context.Background(), key).Result() seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err return int64(utils.StringToInt(seq)), err
} }
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) { func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
seq, err := r.rdb.Incr(context.Background(), key).Result() seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err return seq, err
} }
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq uint64) error { func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err() return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
} }
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq uint32) error { func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID key := groupMinSeq + groupID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err() return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
} }
@ -231,7 +233,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl
return r.rdb.HDel(context.Background(), key, fields...).Err() return r.rdb.HDel(context.Background(), key, fields...).Err()
} }
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err2 error) { func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) {
for _, v := range seqList { for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v)) key := messageCache + userID + "_" + strconv.Itoa(int(v))
@ -398,7 +400,7 @@ func (r *RedisClient) DelUserSignalList(ctx context.Context, userID string) erro
return err return err
} }
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []uint32, operationID string) { func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []int64, operationID string) {
for _, seq := range seqList { for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq)) key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := r.rdb.Get(context.Background(), key).Result() result, err := r.rdb.Get(context.Background(), key).Result()

View File

@ -26,6 +26,7 @@ type GroupInterface interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员 DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember // GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error) TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)
@ -91,6 +92,10 @@ func (g *GroupController) DismissGroup(ctx context.Context, groupID string) erro
return g.database.DismissGroup(ctx, groupID) return g.database.DismissGroup(ctx, groupID)
} }
func (g *GroupController) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) {
return g.database.
}
func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) { func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return g.database.TakeGroupMember(ctx, groupID, userID) return g.database.TakeGroupMember(ctx, groupID, userID)
} }
@ -182,6 +187,7 @@ type Group interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员 DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
} }
type GroupMember interface { type GroupMember interface {
@ -229,6 +235,8 @@ type GroupDataBaseInterface interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error) SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员 DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember // GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error) TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)

View File

@ -11,7 +11,6 @@ import (
"github.com/gogo/protobuf/sortkeys" "github.com/gogo/protobuf/sortkeys"
"sync" "sync"
//"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg" pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws" "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
@ -25,30 +24,31 @@ import (
type MsgInterface interface { type MsgInterface interface {
// 批量插入消息到db // 批量插入消息到db
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存 // 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存 // incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList // 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 获取群ID或者UserID最新一条在db里面的消息
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 获取群ID或者UserID最老一条在db里面的消息
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 通过seqList获取db中写扩散消息 // 通过seqList获取db中写扩散消息
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在db里面的消息 // 通过seqList获取大群在db里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/cache/db然后重置seq // 删除用户所有消息/cache/db然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// 获取用户 seq mongo和redis
// SetSendMsgStatus GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// GetSendMsgStatus // 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
} }
func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface { func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
@ -59,35 +59,27 @@ type MsgController struct {
database MsgDatabase database MsgDatabase
} }
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error { func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq) return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
} }
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error { func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
return m.database.DeleteMessageFromCache(ctx, userID, msgList) return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
} }
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) { func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList) return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
} }
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) { func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
return m.database.DelMsgBySeqs(ctx, userID, seqs) return m.database.DelMsgBySeqs(ctx, userID, seqs)
} }
func (m *MsgController) GetNewestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) { func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetNewestMsg(ctx, ID)
}
func (m *MsgController) GetOldestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
return m.database.GetOldestMsg(ctx, ID)
}
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetMsgBySeqs(ctx, userID, seqs) return m.database.GetMsgBySeqs(ctx, userID, seqs)
} }
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
} }
@ -95,66 +87,87 @@ func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error
return m.database.CleanUpUserMsg(ctx, userID) return m.database.CleanUpUserMsg(ctx, userID)
} }
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error { func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime) return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
} }
func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime) return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
} }
func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
}
func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
}
func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return m.database.SetUserMinSeq(ctx, userID, minSeq)
}
type MsgDatabaseInterface interface { type MsgDatabaseInterface interface {
// 批量插入消息 // 批量插入消息
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存 // 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存 // incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList // 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 获取群ID或者UserID最新一条在mongo里面的消息 // 获取群ID或者UserID最新一条在mongo里面的消息
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 获取群ID或者UserID最老一条在mongo里面的消息
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 通过seqList获取mongo中写扩散消息 // 通过seqList获取mongo中写扩散消息
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在 mongo里面的消息 // 通过seqList获取大群在 mongo里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/redis/mongo然后重置seq // 删除用户所有消息/redis/mongo然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
} }
type MsgDatabase struct { type MsgDatabase struct {
msgModel unRelationTb.MsgDocModelInterface mgo unRelationTb.MsgDocModelInterface
msgCache cache.Cache cache cache.Cache
msg unRelationTb.MsgDocModel msg unRelationTb.MsgDocModel
} }
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{} return &MsgDatabase{}
} }
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error { func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill() //newTime := utils.GetCurrentTimestampByMill()
if len(msgList) > db.msg.GetSingleGocMsgNum() { if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
return errors.New("too large") return errors.New("too large")
} }
var remain uint64 var remain int64
blk0 := uint64(db.msg.GetSingleGocMsgNum() - 1) blk0 := db.msg.GetSingleGocMsgNum() - 1
//currentMaxSeq 4998 //currentMaxSeq 4998
if currentMaxSeq < uint64(db.msg.GetSingleGocMsgNum()) { if currentMaxSeq < db.msg.GetSingleGocMsgNum() {
remain = blk0 - currentMaxSeq //1 remain = blk0 - currentMaxSeq //1
} else { } else {
excludeBlk0 := currentMaxSeq - blk0 //=1 excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999 //(5000-1)%5000 == 4999
remain = (uint64(db.msg.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(db.msg.GetSingleGocMsgNum()))) % uint64(db.msg.GetSingleGocMsgNum()) remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum()
} }
//remain=1 //remain=1
insertCounter := uint64(0) var insertCounter int64
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0) msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0) msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
docID := "" docID := ""
@ -165,18 +178,18 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
currentMaxSeq++ currentMaxSeq++
sMsg := unRelationTb.MsgInfoModel{} sMsg := unRelationTb.MsgInfoModel{}
sMsg.SendTime = m.MsgData.SendTime sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq) m.MsgData.Seq = currentMaxSeq
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
if insertCounter < remain { if insertCounter < remain {
msgsToMongo = append(msgsToMongo, sMsg) msgsToMongo = append(msgsToMongo, sMsg)
insertCounter++ insertCounter++
docID = db.msg.GetDocID(sourceID, uint32(currentMaxSeq)) docID = db.msg.GetDocID(sourceID, currentMaxSeq)
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID) //log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} else { } else {
msgsToMongoNext = append(msgsToMongoNext, sMsg) msgsToMongoNext = append(msgsToMongoNext, sMsg)
docIDNext = db.msg.GetDocID(sourceID, uint32(currentMaxSeq)) docIDNext = db.msg.GetDocID(sourceID, currentMaxSeq)
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID) //log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} }
} }
@ -185,13 +198,13 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
//filter := bson.M{"uid": seqUid} //filter := bson.M{"uid": seqUid}
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID) //log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err() //err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
err = db.msgModel.PushMsgsToDoc(ctx, docID, msgsToMongo) err = db.mgo.PushMsgsToDoc(ctx, docID, msgsToMongo)
if err != nil { if err != nil {
if err == mongo.ErrNoDocuments { if err == mongo.ErrNoDocuments {
doc := &unRelationTb.MsgDocModel{} doc := &unRelationTb.MsgDocModel{}
doc.DocID = docID doc.DocID = docID
doc.Msg = msgsToMongo doc.Msg = msgsToMongo
if err = db.msgModel.Create(ctx, doc); err != nil { if err = db.mgo.Create(ctx, doc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter) prome.PromeInc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) //log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "") return utils.Wrap(err, "")
@ -211,7 +224,7 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
nextDoc.DocID = docIDNext nextDoc.DocID = docIDNext
nextDoc.Msg = msgsToMongoNext nextDoc.Msg = msgsToMongoNext
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) //log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
if err = db.msgModel.Create(ctx, nextDoc); err != nil { if err = db.mgo.Create(ctx, nextDoc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter) prome.PromeInc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) //log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "") return utils.Wrap(err, "")
@ -223,26 +236,26 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
} }
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
return db.msgCache.DeleteMessageFromCache(ctx, userID, msgs) return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
} }
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) { func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill() //newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList) lenList := len(msgList)
if lenList > db.msg.GetSingleGocMsgNum() { if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, errors.New("too large") return 0, errors.New("too large")
} }
if lenList < 1 { if lenList < 1 {
return 0, errors.New("too short as 0") return 0, errors.New("too short as 0")
} }
// judge sessionType to get seq // judge sessionType to get seq
var currentMaxSeq uint64 var currentMaxSeq int64
var err error var err error
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
currentMaxSeq, err = db.msgCache.GetGroupMaxSeq(ctx, sourceID) currentMaxSeq, err = db.cache.GetGroupMaxSeq(ctx, sourceID)
//log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err) //log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
} else { } else {
currentMaxSeq, err = db.msgCache.GetUserMaxSeq(ctx, sourceID) currentMaxSeq, err = db.cache.GetUserMaxSeq(ctx, sourceID)
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err) //log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
} }
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
@ -253,11 +266,11 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
lastMaxSeq := currentMaxSeq lastMaxSeq := currentMaxSeq
for _, m := range msgList { for _, m := range msgList {
currentMaxSeq++ currentMaxSeq++
m.MsgData.Seq = uint32(currentMaxSeq) m.MsgData.Seq = currentMaxSeq
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq) //log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq)
} }
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList)) //log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
failedNum, err := db.msgCache.SetMessageToCache(ctx, sourceID, msgList) failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
if err != nil { if err != nil {
prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum) prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum)
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID) //log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
@ -266,9 +279,9 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
} }
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList)) //log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
err = db.msgCache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq) err = db.cache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
} else { } else {
err = db.msgCache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq) err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
} }
if err != nil { if err != nil {
prome.PromeInc(prome.SeqSetFailedCounter) prome.PromeInc(prome.SeqSetFailedCounter)
@ -278,14 +291,14 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
return lastMaxSeq, utils.Wrap(err, "") return lastMaxSeq, utils.Wrap(err, "")
} }
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) { func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Uint32s(seqs) sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
lock := sync.Mutex{} lock := sync.Mutex{}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(docIDSeqsMap)) wg.Add(len(docIDSeqsMap))
for k, v := range docIDSeqsMap { for k, v := range docIDSeqsMap {
go func(docID string, seqs []uint32) { go func(docID string, seqs []int64) {
defer wg.Done() defer wg.Done()
unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs) unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs)
if err != nil { if err != nil {
@ -299,26 +312,26 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []u
return totalUnExistSeqs, nil return totalUnExistSeqs, nil
} }
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (unExistSeqs []uint32, err error) { func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for i, v := range seqMsgs { for i, v := range seqMsgs {
if err = db.msgModel.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil { if err = db.mgo.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
return nil, err return nil, err
} }
} }
return unExistSeqs, nil return unExistSeqs, nil
} }
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []uint32, err error) { func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.msgModel.FindOneByDocID(ctx, docID) doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
singleCount := 0 singleCount := 0
var hasSeqList []uint32 var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ { for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i]) msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil { if err != nil {
@ -344,7 +357,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
} }
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgModel.GetNewestMsg(ctx, sourceID) msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -352,7 +365,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
} }
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgModel.GetOldestMsg(ctx, sourceID) msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -368,12 +381,12 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
return msgPb, nil return msgPb, nil
} }
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []uint32, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqs []uint32 var hasSeqs []int64
singleCount := 0 singleCount := 0
m := db.msg.GetDocIDSeqsMap(sourceID, seqs) m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
for docID, value := range m { for docID, value := range m {
doc, err := db.msgModel.FindOneByDocID(ctx, docID) doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil { if err != nil {
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error()) //log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
continue continue
@ -396,7 +409,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
} }
} }
if len(hasSeqs) != len(seqs) { if len(hasSeqs) != len(seqs) {
var diff []uint32 var diff []int64
var exceptionMsg []*sdkws.MsgData var exceptionMsg []*sdkws.MsgData
diff = utils.Difference(hasSeqs, seqs) diff = utils.Difference(hasSeqs, seqs)
if diffusionType == constant.WriteDiffusion { if diffusionType == constant.WriteDiffusion {
@ -409,8 +422,8 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
return seqMsg, nil return seqMsg, nil
} }
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, userID, seqs) successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -430,8 +443,8 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []u
return successMsgs, nil return successMsgs, nil
} }
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, groupID, seqs) successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs)
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -456,7 +469,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error
if err != nil { if err != nil {
return err return err
} }
err = db.msgCache.CleanUpOneUserAllMsg(ctx, userID) err = db.cache.CleanUpOneUserAllMsg(ctx, userID)
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
@ -471,15 +484,15 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
} }
//log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq) //log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
for _, userID := range userIDs { for _, userID := range userIDs {
userMinSeq, err := db.msgCache.GetGroupUserMinSeq(ctx, groupID, userID) userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) //log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
continue continue
} }
if userMinSeq > uint64(minSeq) { if userMinSeq > minSeq {
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq) err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
} else { } else {
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, uint64(minSeq)) err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
} }
if err != nil { if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
@ -497,16 +510,16 @@ func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID st
if minSeq == 0 { if minSeq == 0 {
return nil return nil
} }
return db.msgCache.SetUserMinSeq(ctx, userID, uint64(minSeq)) return db.cache.SetUserMinSeq(ctx, userID, minSeq)
} }
// this is struct for recursion // this is struct for recursion
type delMsgRecursionStruct struct { type delMsgRecursionStruct struct {
minSeq uint32 minSeq int64
delDocIDList []string delDocIDList []string
} }
func (d *delMsgRecursionStruct) getSetMinSeq() uint32 { func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
return d.minSeq return d.minSeq
} }
@ -514,9 +527,9 @@ func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
// seq 70 // seq 70
// set minSeq 21 // set minSeq 21
// recursion 删除list并且返回设置的最小seq // recursion 删除list并且返回设置的最小seq
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (uint32, error) { func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list // find from oldest list
msgs, err := db.msgModel.GetMsgsByIndex(ctx, sourceID, index) msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
if err != nil || msgs.DocID == "" { if err != nil || msgs.DocID == "" {
if err != nil { if err != nil {
if err == unrelation.ErrMsgListNotExist { if err == unrelation.ErrMsgListNotExist {
@ -526,14 +539,14 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
} }
} }
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList) // 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList)
err = db.msgModel.Delete(ctx, delStruct.delDocIDList) err = db.mgo.Delete(ctx, delStruct.delDocIDList)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return delStruct.getSetMinSeq() + 1, nil return delStruct.getSetMinSeq() + 1, nil
} }
//log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) //log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
if len(msgs.Msg) > db.msg.GetSingleGocMsgNum() { if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.NewWarn(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "msgs too large:", len(msgs.Msg), "docID:", msgs.DocID) log.NewWarn(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "msgs too large:", len(msgs.Msg), "docID:", msgs.DocID)
} }
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
@ -561,11 +574,11 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
msg.SendTime = 0 msg.SendTime = 0
hasMarkDelFlag = true hasMarkDelFlag = true
} else { } else {
if err := db.msgModel.Delete(ctx, delStruct.delDocIDList); err != nil { if err := db.mgo.Delete(ctx, delStruct.delDocIDList); err != nil {
return 0, err return 0, err
} }
if hasMarkDelFlag { if hasMarkDelFlag {
if err := db.msgModel.UpdateOneDoc(ctx, msgs); err != nil { if err := db.mgo.UpdateOneDoc(ctx, msgs); err != nil {
return delStruct.getSetMinSeq(), utils.Wrap(err, "") return delStruct.getSetMinSeq(), utils.Wrap(err, "")
} }
} }
@ -578,3 +591,62 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime) seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
return seq, utils.Wrap(err, "deleteMsg failed") return seq, utils.Wrap(err, "deleteMsg failed")
} }
func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
// from cache
minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
return
}
func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
return
}
func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
}
msgPb, err := db.unmarshalMsg(oldestMsgMongo)
if err != nil {
return 0, 0, err
}
minSeqMongo = msgPb.Seq
newestMsgMongo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
}
msgPb, err = db.unmarshalMsg(newestMsgMongo)
if err != nil {
return 0, 0, err
}
maxSeqMongo = msgPb.Seq
return
}
func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}

View File

@ -67,3 +67,10 @@ func (g *GroupGorm) Search(ctx context.Context, keyword string, pageNumber, show
}() }()
return gormSearch[relation.GroupModel](getDBConn(g.DB, tx), []string{"name"}, keyword, pageNumber, showNumber) return gormSearch[relation.GroupModel](getDBConn(g.DB, tx), []string{"name"}, keyword, pageNumber, showNumber)
} }
func (g *GroupGorm) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) {
if err := g.DB.Model(&relation.GroupModel{}).Where("group_type = ? ", groupType).Pluck("group_id", &groupIDs).Error; err != nil {
return nil, utils.Wrap(err, "")
}
return groupIDs, nil
}

View File

@ -41,7 +41,7 @@ func (MsgDocModel) TableName() string {
return CChat return CChat
} }
func (MsgDocModel) GetSingleGocMsgNum() int { func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum return singleGocMsgNum
} }
@ -59,36 +59,36 @@ func (m *MsgDocModel) IsFull() bool {
return false return false
} }
func (m MsgDocModel) GetDocID(sourceID string, seq uint32) string { func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocMsgNum
return m.indexGen(sourceID, seqSuffix) return m.indexGen(sourceID, seqSuffix)
} }
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq uint32) []string { func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum seqMaxSuffix := maxSeq / singleGocMsgNum
var seqUserIDs []string var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ { for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, uint32(i)) seqUserID := m.indexGen(userID, int64(i))
seqUserIDs = append(seqUserIDs, seqUserID) seqUserIDs = append(seqUserIDs, seqUserID)
} }
return seqUserIDs return seqUserIDs
} }
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq uint32) string { func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocMsgNum
return m.superGroupIndexGen(groupID, seqSuffix) return m.superGroupIndexGen(groupID, seqSuffix)
} }
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix uint32) string { func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
} }
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []uint32) map[string][]uint32 { func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]uint32) t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ { for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i]) docID := m.GetDocID(sourceID, seqs[i])
if value, ok := t[docID]; !ok { if value, ok := t[docID]; !ok {
var temp []uint32 var temp []int64
t[docID] = append(temp, seqs[i]) t[docID] = append(temp, seqs[i])
} else { } else {
t[docID] = append(value, seqs[i]) t[docID] = append(value, seqs[i])
@ -108,11 +108,11 @@ func (m MsgDocModel) getMsgIndex(seq uint32) int {
return int(index) return int(index)
} }
func (m MsgDocModel) indexGen(sourceID string, seqSuffix uint32) string { func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(int64(seqSuffix), 10) return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
} }
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []uint32) (exceptionMsg []*sdkws.MsgData) { func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs { for _, v := range seqs {
msg := new(sdkws.MsgData) msg := new(sdkws.MsgData)
msg.Seq = v msg.Seq = v
@ -121,7 +121,7 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []uint32) (exceptionMsg []*sdk
return exceptionMsg return exceptionMsg
} }
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) { func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs { for _, v := range seqs {
msg := new(sdkws.MsgData) msg := new(sdkws.MsgData)
msg.Seq = v msg.Seq = v

View File

@ -9,7 +9,7 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
prome "Open_IM/pkg/common/prometheus" prome "Open_IM/pkg/common/prome"
) )
type Producer struct { type Producer struct {

View File

@ -239,17 +239,6 @@ func NewWarn(OperationID string, args ...interface{}) {
func ShowLog(ctx context.Context) { func ShowLog(ctx context.Context) {
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo) t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
OperationID := tracelog.GetOperationID(ctx) OperationID := tracelog.GetOperationID(ctx)
//if ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo).GinCtx != nil {
// ctxLogger.WithFields(logrus.Fields{
// "OperationID": OperationID,
// "PID": ctxLogger.Pid,
// }).Infoln("api: ", t.ApiName)
//} else {
// ctxLogger.WithFields(logrus.Fields{
// "OperationID": OperationID,
// "PID": ctxLogger.Pid,
// }).Infoln("rpc: ", t.ApiName)
//}
for _, v := range *t.Funcs { for _, v := range *t.Funcs {
if v.Err != nil { if v.Err != nil {

37
pkg/proto/file/file.proto Normal file
View File

@ -0,0 +1,37 @@
syntax = "proto3";
option go_package = "Open_IM/pkg/proto/file;file";
package file;
message ApplySpaceReq {
string name = 1; //
int64 size = 2; //
string hash = 3; // md5
uint32 purpose = 4; //
string contentType = 5;
}
message ApplySpaceResp {
string url = 1; //
int64 size = 2; //
repeated string put = 3;// put地址
string confirmID = 4; // ID
}
message ConfirmSpaceReq {
string confirmID = 1; // ID
}
message ConfirmSpaceResp {
string confirmID = 1;
}
service file {
rpc ApplySpaceReq(ApplySpaceReq) returns(ApplySpaceResp);
}

View File

@ -223,6 +223,13 @@ message MsgDataToModifyByMQ{
string triggerID = 3; string triggerID = 3;
} }
message DelMsgListReq{
string userID = 2;
repeated uint32 seqList = 3;
}
message DelMsgListResp{
}
service msg { service msg {
//seq //seq
@ -232,7 +239,7 @@ service msg {
// //
rpc SendMsg(SendMsgReq) returns(SendMsgResp); rpc SendMsg(SendMsgReq) returns(SendMsgResp);
// //
rpc DelMsgList(sdkws.DelMsgListReq) returns(sdkws.DelMsgListResp); rpc DelMsgList(DelMsgListReq) returns(DelMsgListResp);
// //
rpc DelSuperGroupMsg(DelSuperGroupMsgReq) returns(DelSuperGroupMsgResp); rpc DelSuperGroupMsg(DelSuperGroupMsgReq) returns(DelSuperGroupMsgResp);
// //

View File

@ -10,4 +10,5 @@ all_proto=(
relay/relay.proto relay/relay.proto
sdkws/ws.proto sdkws/ws.proto
conversation/conversation.proto conversation/conversation.proto
file/file.proto
) )

File diff suppressed because it is too large Load Diff

View File

@ -132,12 +132,12 @@ message FriendRequest{
///////////////////////////////////base end///////////////////////////////////// ///////////////////////////////////base end/////////////////////////////////////
message PullMessageBySeqListReq{ message PullMessageBySeqListReq{
string userID = 1; string userID = 1;
repeated uint32 seqList = 3; repeated int64 seqs = 3;
map <string, seqList>groupSeqList = 4; map <string, seqList>groupSeqList = 4;
} }
message seqList { message seqList {
repeated uint32 seqList = 1; repeated int64 seqs = 1;
} }
@ -159,12 +159,12 @@ message GetMaxAndMinSeqReq {
string userID = 2; string userID = 2;
} }
message MaxAndMinSeq{ message MaxAndMinSeq{
uint32 maxSeq = 1; int64 maxSeq = 1;
uint32 minSeq = 2; int64 minSeq = 2;
} }
message GetMaxAndMinSeqResp { message GetMaxAndMinSeqResp {
uint32 maxSeq = 1; int64 maxSeq = 1;
uint32 minSeq = 2; int64 minSeq = 2;
map<string, MaxAndMinSeq> groupMaxAndMinSeq = 5; map<string, MaxAndMinSeq> groupMaxAndMinSeq = 5;
} }
@ -187,7 +187,7 @@ message MsgData {
int32 msgFrom = 10; int32 msgFrom = 10;
int32 contentType = 11; int32 contentType = 11;
bytes content = 12; bytes content = 12;
uint32 seq = 14; int64 seq = 14;
int64 sendTime = 15; int64 sendTime = 15;
int64 createTime = 16; int64 createTime = 16;
int32 status = 17; int32 status = 17;
@ -426,7 +426,7 @@ message ConversationSetPrivateTips{
message DeleteMessageTips{ message DeleteMessageTips{
string opUserID = 1; string opUserID = 1;
string userID = 2; string userID = 2;
repeated uint32 seqList = 3; repeated int64 seqs = 3;
} }
///cms ///cms
message RequestPagination { message RequestPagination {
@ -601,13 +601,7 @@ message SignalGetTokenByRoomIDReply {
} }
message DelMsgListReq{
string userID = 2;
repeated uint32 seqList = 3;
}
message DelMsgListResp{
}
message SetAppBackgroundStatusReq { message SetAppBackgroundStatusReq {
string userID = 1; string userID = 1;
@ -642,9 +636,3 @@ message KeyValue {
int64 latestUpdateTime = 3; int64 latestUpdateTime = 3;
} }
message ResponsePagination {
int32 CurrentPage = 5;
int32 ShowNumber = 6;
}

View File

@ -57,9 +57,9 @@ func cleanUpFuncName(funcName string) string {
} }
// Get the intersection of two slices // Get the intersection of two slices
func Intersect(slice1, slice2 []uint32) []uint32 { func Intersect(slice1, slice2 []int64) []int64 {
m := make(map[uint32]bool) m := make(map[int64]bool)
n := make([]uint32, 0) n := make([]int64, 0)
for _, v := range slice1 { for _, v := range slice1 {
m[v] = true m[v] = true
} }
@ -73,9 +73,9 @@ func Intersect(slice1, slice2 []uint32) []uint32 {
} }
// Get the diff of two slices // Get the diff of two slices
func Difference(slice1, slice2 []uint32) []uint32 { func Difference(slice1, slice2 []int64) []int64 {
m := make(map[uint32]bool) m := make(map[int64]bool)
n := make([]uint32, 0) n := make([]int64, 0)
inter := Intersect(slice1, slice2) inter := Intersect(slice1, slice2)
for _, v := range inter { for _, v := range inter {
m[v] = true m[v] = true