Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-02-16 16:20:02 +08:00
commit fe2cf94d36
63 changed files with 819 additions and 887 deletions

View File

@ -1,7 +1,6 @@
package main
import (
_ "Open_IM/cmd/open_im_api/docs"
apiAuth "Open_IM/internal/api/auth"
"Open_IM/internal/api/conversation"
"Open_IM/internal/api/friend"
@ -14,7 +13,6 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"
"flag"
"fmt"
@ -28,7 +26,7 @@ import (
"github.com/gin-gonic/gin"
//"syscall"
"Open_IM/pkg/common/constant"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
)
// @title open-IM-Server API

View File

@ -4,7 +4,7 @@ import (
rpcConversation "Open_IM/internal/rpc/conversation"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"flag"
"fmt"
)

1
go.mod
View File

@ -56,7 +56,6 @@ require (
)
require (
github.com/envoyproxy/protoc-gen-validate v0.1.0
github.com/go-openapi/spec v0.20.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-playground/locales v0.14.1 // indirect

1
go.sum
View File

@ -510,7 +510,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=

View File

@ -1,7 +1,7 @@
package apiAuth
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"

View File

@ -1,10 +1,9 @@
package conversation
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"

View File

@ -2,7 +2,7 @@ package friend
//import (
// jsonData "Open_IM/internal/utils"
// api "Open_IM/pkg/api_struct"
// api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/log"
// "Open_IM/pkg/common/tokenverify"

View File

@ -1,8 +1,8 @@
package friend
import (
common "Open_IM/internal/api_to_rpc"
api "Open_IM/pkg/api_struct"
common "Open_IM/internal/api2rpc"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
rpc "Open_IM/pkg/proto/friend"
"github.com/gin-gonic/gin"

View File

@ -2,7 +2,7 @@ package group
//import (
// common "Open_IM/internal/api_to_rpc"
// api "Open_IM/pkg/api_struct"
// api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/constant"
// "Open_IM/pkg/common/log"

View File

@ -1,7 +1,7 @@
package group
import (
"Open_IM/pkg/api_struct"
"Open_IM/pkg/apistruct"
"Open_IM/pkg/proto/group"
"context"
"errors"

View File

@ -2,7 +2,7 @@ package group
//import (
// jsonData "Open_IM/internal/utils"
// api "Open_IM/pkg/api_struct"
// api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/log"
// "Open_IM/pkg/common/token_verify"

View File

@ -7,10 +7,9 @@
package manage
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
pbChat "Open_IM/pkg/proto/msg"

View File

@ -7,12 +7,11 @@
package manage
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
rpc "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"

View File

@ -1,12 +1,11 @@
package msg
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
rpc "Open_IM/pkg/proto/msg"
pbCommon "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"

View File

@ -1,12 +1,11 @@
package msg
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
rpc "Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"

View File

@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbChat "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"context"

View File

@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
"Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"

View File

@ -7,7 +7,6 @@ import (
sdkws "Open_IM/pkg/proto/sdkws"
"context"
"Open_IM/pkg/getcdv3"
"github.com/gin-gonic/gin"
"net/http"
"strings"

View File

@ -1,7 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"

View File

@ -1,7 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"

View File

@ -1,8 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/db"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"

View File

@ -1,10 +1,9 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
_ "Open_IM/pkg/common/tokenverify"

View File

@ -1,8 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/db"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"

View File

@ -1,8 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/db"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"

View File

@ -1,7 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"

View File

@ -1,17 +1,16 @@
package user
import (
jsonData "Open_IM/internal/utils"
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
cacheRpc "Open_IM/pkg/proto/cache"
pbRelay "Open_IM/pkg/proto/relay"
sdkws "Open_IM/pkg/proto/sdkws"
rpc "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
jsonData "Open_IM/pkg/utils"
"context"
"net/http"
"strings"

View File

@ -3,92 +3,77 @@ package cronTask
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/mongo"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context"
"math"
"strconv"
"strings"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
)
type SeqCheckInterface interface {
ClearAll() error
type ClearMsgTool struct {
msgInterface controller.MsgInterface
userInterface controller.UserInterface
groupInterface controller.GroupInterface
}
type ClearMsgCronTask struct {
msgModel controller.MsgInterface
userModel controller.UserInterface
groupModel controller.GroupInterface
cache cache.Cache
}
func (c *ClearMsgCronTask) getCronTaskOperationID() string {
func (c *ClearMsgTool) getCronTaskOperationID() string {
return cronTaskOperationID + utils.OperationIDGenerator()
}
func (c *ClearMsgCronTask) ClearAll() {
func (c *ClearMsgTool) ClearAll() {
operationID := c.getCronTaskOperationID()
ctx := context.Background()
tracelog.SetOperationID(ctx, operationID)
log.NewInfo(operationID, "========================= start del cron task =========================")
log.NewInfo(operationID, "============================ start del cron task ============================")
var err error
userIDList, err := c.userModel.GetAllUserID(ctx)
userIDList, err := c.userInterface.GetAllUserID(ctx)
if err == nil {
c.StartClearMsg(operationID, userIDList)
c.ClearUsersMsg(ctx, userIDList)
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
// working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup)
workingGroupIDList, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err == nil {
c.StartClearWorkingGroupMsg(operationID, workingGroupIDList)
c.ClearSuperGroupMsg(ctx, workingGroupIDList)
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
log.NewInfo(operationID, "========================= start del cron finished =========================")
log.NewInfo(operationID, "============================ start del cron finished ============================")
}
func (c *ClearMsgCronTask) StartClearMsg(operationID string, userIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList)
func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) {
for _, userID := range userIDList {
if err := DeleteUserMsgsAndSetMinSeq(operationID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID)
if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil {
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID)
}
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
}
}
func (c *ClearMsgCronTask) StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList)
for _, groupID := range workingGroupIDList {
userIDList, err := rocksCache.GetGroupMemberIDListFromCache(groupID)
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID)
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID)
continue
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "workingGroupIDList:", userIDList)
if err := DeleteUserSuperGroupMsgsAndSetMinSeq(operationID, groupID, userIDList); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
if
}
}
func checkMaxSeqWithMongo(operationID, sourceID string, diffusionType int) error {
func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) {
for _, groupID := range workingGroupIDList {
userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID)
if err != nil {
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID)
continue
}
if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
}
}
func (c *ClearMsgTool) checkMaxSeqWithMongo(ctx context.Context, sourceID string, diffusionType int) error {
var seqRedis uint64
var err error
if diffusionType == constant.WriteDiffusion {

View File

@ -19,11 +19,11 @@ func StartCronTask(userID, workingGroupID string) {
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
if userID != "" {
operationID := getCronTaskOperationID()
StartClearMsg(operationID, []string{userID})
ClearUsersMsg(operationID, []string{userID})
}
if workingGroupID != "" {
operationID := getCronTaskOperationID()
StartClearWorkingGroupMsg(operationID, []string{workingGroupID})
ClearSuperGroupMsg(operationID, []string{workingGroupID})
}
if userID != "" || workingGroupID != "" {
fmt.Println("clear msg finished")

View File

@ -2,7 +2,6 @@ package msggateway
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"

View File

@ -8,7 +8,7 @@ import (
"fmt"
"sync"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"github.com/go-playground/validator/v10"
)

View File

@ -3,7 +3,6 @@ package msggateway
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome"
pbChat "Open_IM/pkg/proto/msg"

View File

@ -2,21 +2,20 @@ package new
import (
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/utils"
"bytes"
"context"
"errors"
"fmt"
"github.com/envoyproxy/protoc-gen-validate/validate"
"github.com/go-playground/validator/v10"
"open_im_sdk/pkg/log"
"open_im_sdk/pkg/utils"
"runtime/debug"
"sync"
"time"
)
const (
// MessageText is for UTF-8 encoded text messages like JSON.
MessageText = iota + 1
MessageText = iota + 1
// MessageBinary is for binary messages like protobufs.
MessageBinary
// CloseMessage denotes a close control message. The optional message
@ -34,48 +33,51 @@ const (
)
type Client struct {
w *sync.Mutex
conn LongConn
PlatformID int32
PushedMaxSeq uint32
IsCompress bool
userID string
IsBackground bool
token string
connID string
onlineAt int64 // 上线时间戳(毫秒)
handler MessageHandler
unregisterChan chan *Client
w *sync.Mutex
conn LongConn
PlatformID int32
PushedMaxSeq uint32
IsCompress bool
userID string
IsBackground bool
token string
connID string
onlineAt int64 // 上线时间戳(毫秒)
handler MessageHandler
unregisterChan chan *Client
compressor Compressor
encoder Encoder
userContext UserConnContext
validate *validator.Validate
userContext UserConnContext
validate *validator.Validate
closed bool
}
func newClient( conn LongConn,isCompress bool, userID string, isBackground bool, token string,
connID string, onlineAt int64, handler MessageHandler,unregisterChan chan *Client) *Client {
func newClient(conn LongConn, isCompress bool, userID string, isBackground bool, token string,
connID string, onlineAt int64, handler MessageHandler, unregisterChan chan *Client) *Client {
return &Client{
conn: conn,
conn: conn,
IsCompress: isCompress,
userID: userID, IsBackground:
isBackground, token: token,
connID: connID,
onlineAt: onlineAt,
handler: handler,
unregisterChan: unregisterChan,
userID: userID, IsBackground: isBackground, token: token,
connID: connID,
onlineAt: onlineAt,
handler: handler,
unregisterChan: unregisterChan,
}
}
func(c *Client) readMessage(){
func (c *Client) readMessage() {
defer func() {
if r:=recover(); r != nil {
if r := recover(); r != nil {
fmt.Println("socket have panic err:", r, string(debug.Stack()))
}
//c.close()
}()
var returnErr error
for {
messageType, message, returnErr := c.conn.ReadMessage()
if returnErr!=nil{
var returnErr error
for {
messageType, message, returnErr := c.conn.ReadMessage()
if returnErr != nil {
break
}
if c.closed == true {
break
}
switch messageType {
@ -89,7 +91,7 @@ func(c *Client) readMessage(){
continue
}
returnErr = c.handleMessage(message)
if returnErr!=nil{
if returnErr != nil {
break
}
@ -97,52 +99,88 @@ func(c *Client) readMessage(){
}
}
func (c *Client) handleMessage(message []byte)error {
if c.IsCompress {
func (c *Client) handleMessage(message []byte) error {
if c.IsCompress {
var decompressErr error
message,decompressErr = c.compressor.DeCompress(message)
message, decompressErr = c.compressor.DeCompress(message)
if decompressErr != nil {
return utils.Wrap(decompressErr,"")
return utils.Wrap(decompressErr, "")
}
}
var binaryReq Req
var binaryReq Req
err := c.encoder.Decode(message, &binaryReq)
if err != nil {
return utils.Wrap(err,"")
return utils.Wrap(err, "")
}
if err := c.validate.Struct(binaryReq); err != nil {
return utils.Wrap(err,"")
return utils.Wrap(err, "")
}
if binaryReq.SendID != c.userID {
return errors.New("exception conn userID not same to req userID")
}
ctx:=context.Background()
ctx =context.WithValue(ctx,"operationID",binaryReq.OperationID)
ctx = context.WithValue(ctx,"userID",binaryReq.SendID)
ctx := context.Background()
ctx = context.WithValue(ctx, "operationID", binaryReq.OperationID)
ctx = context.WithValue(ctx, "userID", binaryReq.SendID)
var messageErr error
var resp []byte
var resp []byte
switch binaryReq.ReqIdentifier {
case constant.WSGetNewestSeq:
resp,messageErr=c.handler.GetSeq(ctx,binaryReq)
resp, messageErr = c.handler.GetSeq(ctx, binaryReq)
case constant.WSSendMsg:
resp,messageErr=c.handler.SendMessage(ctx,binaryReq)
resp, messageErr = c.handler.SendMessage(ctx, binaryReq)
case constant.WSSendSignalMsg:
resp,messageErr=c.handler.SendSignalMessage(ctx,binaryReq)
resp, messageErr = c.handler.SendSignalMessage(ctx, binaryReq)
case constant.WSPullMsgBySeqList:
resp,messageErr=c.handler.PullMessageBySeqList(ctx,binaryReq)
resp, messageErr = c.handler.PullMessageBySeqList(ctx, binaryReq)
case constant.WsLogoutMsg:
resp,messageErr=c.handler.UserLogout(ctx,binaryReq)
resp, messageErr = c.handler.UserLogout(ctx, binaryReq)
case constant.WsSetBackgroundStatus:
resp,messageErr=c.handler.SetUserDeviceBackground(ctx,binaryReq)
resp, messageErr = c.handler.SetUserDeviceBackground(ctx, binaryReq)
default:
return errors.New(fmt.Sprintf("ReqIdentifier failed,sendID:%d,msgIncr:%s,reqIdentifier:%s",binaryReq.SendID,binaryReq.MsgIncr,binaryReq.ReqIdentifier))
return errors.New(fmt.Sprintf("ReqIdentifier failed,sendID:%d,msgIncr:%s,reqIdentifier:%s", binaryReq.SendID, binaryReq.MsgIncr, binaryReq.ReqIdentifier))
}
c.replyMessage(binaryReq, messageErr, resp)
return nil
}
func (c *Client) close() {
func (c *Client) close() {
c.w.Lock()
defer c.w.Unlock()
c.conn.Close()
c.unregisterChan <- c
}
func () {
func (c *Client) replyMessage(binaryReq Req, err error, resp []byte) {
mReply := Resp{
ReqIdentifier: binaryReq.ReqIdentifier,
MsgIncr: binaryReq.MsgIncr,
OperationID: binaryReq.OperationID,
Data: resp,
}
_ = c.writeMsg(mReply)
}
func (c *Client) writeMsg(resp Resp) error {
c.w.Lock()
defer c.w.Unlock()
if c.closed == true {
return nil
}
encodedBuf := bufferPool.Get().([]byte)
resultBuf := bufferPool.Get().([]byte)
encodeBuf, err := c.encoder.Encode(resp)
if err != nil {
return utils.Wrap(err, "")
}
_ = c.conn.SetWriteTimeout(60)
if c.IsCompress {
var compressErr error
resultBuf, compressErr = c.compressor.Compress(encodeBuf)
if compressErr != nil {
return utils.Wrap(compressErr, "")
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
} else {
return c.conn.WriteMessage(MessageBinary, encodedBuf)
}
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"compress/gzip"
"io/ioutil"
"open_im_sdk/pkg/utils"
)
type Compressor interface {

View File

@ -3,7 +3,6 @@ package new
import (
"bytes"
"encoding/gob"
"open_im_sdk/pkg/utils"
)
type Encoder interface {

View File

@ -10,6 +10,14 @@ type Req struct {
MsgIncr string `json:"msgIncr" validate:"required"`
Data []byte `json:"data"`
}
type Resp struct {
ReqIdentifier int32 `json:"reqIdentifier"`
MsgIncr string `json:"msgIncr"`
OperationID string `json:"operationID"`
ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data []byte `json:"data"`
}
type MessageHandler interface {
GetSeq(context context.Context, data Req) ([]byte, error)
SendMessage(context context.Context, data Req) ([]byte, error)

View File

@ -1,14 +1,21 @@
package new
import (
"bytes"
"errors"
"github.com/gorilla/websocket"
"net/http"
"open_im_sdk/pkg/utils"
"sync"
"sync/atomic"
"time"
)
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1000)
},
}
type LongConnServer interface {
Run() error
}
@ -58,6 +65,41 @@ func newWsServer(opts ...Option) (*WsServer, error) {
}, nil
}
func (ws *WsServer) Run() error {
var client *Client
go func() {
for {
select {
case client = <-ws.registerChan:
ws.registerClient(client)
case client = <-h.unregisterChan:
h.unregisterClient(client)
case msg = <-h.readChan:
h.messageHandler(msg)
}
}
}()
}
func (ws *WsServer) registerClient(client *Client) {
var (
ok bool
cli *Client
)
if cli, ok = h.clients.Get(client.key); ok == false {
h.clients.Set(client.key, client)
atomic.AddInt64(&h.onlineConnections, 1)
fmt.Println("R在线用户数量:", h.onlineConnections)
return
}
if client.onlineAt > cli.onlineAt {
h.clients.Set(client.key, client)
h.close(cli)
return
}
h.close(client)
}
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening

View File

@ -12,15 +12,13 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
type RPCServer struct {

View File

@ -3,9 +3,8 @@ package msggateway
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"Open_IM/pkg/common/tokenverify"
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"

View File

@ -3,7 +3,6 @@ package fcm
import (
"Open_IM/internal/push"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"context"
go_redis "github.com/go-redis/redis/v8"

View File

@ -3,7 +3,6 @@ package getui
import (
"Open_IM/internal/push"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"bytes"

View File

@ -15,7 +15,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"Open_IM/pkg/statistics"
"fmt"
)

View File

@ -3,10 +3,9 @@ package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/getcdv3"
prome "Open_IM/pkg/common/prome"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
"context"

View File

@ -10,9 +10,8 @@ import (
"Open_IM/internal/push"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/getcdv3"
pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay"
pbRtc "Open_IM/pkg/proto/rtc"
@ -20,7 +19,7 @@ import (
"context"
"strings"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"github.com/golang/protobuf/proto"
)

View File

@ -1,10 +1,7 @@
package logic
import (
tpns "Open_IM/internal/push/sdk/tpns-server-sdk-go/go"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/auth"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/common"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/req"
"Open_IM/pkg/common/config"
)

View File

@ -8,12 +8,10 @@ import (
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
tableRelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/getcdv3"
promePkg "Open_IM/pkg/common/prome"
pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"

View File

@ -32,7 +32,7 @@ func toCommonCallback(ctx context.Context, msg *pbChat.SendMsgReq, command strin
AtUserIDList: msg.MsgData.AtUserIDList,
SenderFaceURL: msg.MsgData.SenderFaceURL,
Content: utils.GetContent(msg.MsgData),
Seq: msg.MsgData.Seq,
Seq: uint32(msg.MsgData.Seq),
Ex: msg.MsgData.Ex,
}
}

View File

@ -3,7 +3,6 @@ package msg
import (
"Open_IM/internal/common/notification"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"

View File

@ -1,7 +1,6 @@
package msg
import (
"Open_IM/pkg/common/db"
"time"
)

View File

@ -2,7 +2,7 @@ package msg
import (
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
promePkg "Open_IM/pkg/common/prome"
pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"

View File

@ -9,7 +9,7 @@ import (
discoveryRegistry "Open_IM/pkg/discoveryregistry"
"github.com/OpenIMSDK/openKeeper"
promePkg "Open_IM/pkg/common/prometheus"
promePkg "Open_IM/pkg/common/prome"
"Open_IM/pkg/proto/msg"
"google.golang.org/grpc"
)

View File

@ -6,7 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus"
promePkg "Open_IM/pkg/common/prome"
"flag"
"fmt"
"github.com/OpenIMSDK/openKeeper"

View File

@ -38,22 +38,24 @@ const (
)
type Cache interface {
IncrUserSeq(ctx context.Context, userID string) (uint64, error)
GetUserMaxSeq(ctx context.Context, userID string) (uint64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq uint64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq uint64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (uint64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq uint64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (uint64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (uint64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (uint64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq uint64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq uint32) error
IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err error)
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
@ -61,7 +63,7 @@ type Cache interface {
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []uint32) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
@ -138,66 +140,66 @@ func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
}
// Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (uint64, error) {
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
return seq, err
}
// Get the largest Seq
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (uint64, error) {
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
// set the largest Seq
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq uint64) error {
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq int64) error {
key := userIncrSeq + uid
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
// Set the user's minimum seq
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq uint64) (err error) {
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq int64) (err error) {
key := userMinSeq + uid
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
// Get the smallest Seq
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (uint64, error) {
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (int64, error) {
key := userMinSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq uint64) (err error) {
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (uint64, error) {
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) {
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (uint64, error) {
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
return seq, err
}
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq uint64) error {
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq uint32) error {
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
@ -231,7 +233,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err2 error) {
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
@ -398,7 +400,7 @@ func (r *RedisClient) DelUserSignalList(ctx context.Context, userID string) erro
return err
}
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []uint32, operationID string) {
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []int64, operationID string) {
for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := r.rdb.Get(context.Background(), key).Result()

View File

@ -26,6 +26,7 @@ type GroupInterface interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)
@ -91,6 +92,10 @@ func (g *GroupController) DismissGroup(ctx context.Context, groupID string) erro
return g.database.DismissGroup(ctx, groupID)
}
func (g *GroupController) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) {
return g.database.
}
func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
return g.database.TakeGroupMember(ctx, groupID, userID)
}
@ -182,6 +187,7 @@ type Group interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
}
type GroupMember interface {
@ -229,6 +235,8 @@ type GroupDataBaseInterface interface {
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)

View File

@ -11,7 +11,6 @@ import (
"github.com/gogo/protobuf/sortkeys"
"sync"
//"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
@ -25,30 +24,31 @@ import (
type MsgInterface interface {
// 批量插入消息到db
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error)
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error)
// 获取群ID或者UserID最新一条在db里面的消息
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 获取群ID或者UserID最老一条在db里面的消息
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 通过seqList获取db中写扩散消息
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在db里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/cache/db然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// SetSendMsgStatus
// GetSendMsgStatus
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
}
func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface {
@ -59,35 +59,27 @@ type MsgController struct {
database MsgDatabase
}
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq)
}
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error {
return m.database.DeleteMessageFromCache(ctx, userID, msgList)
func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error {
return m.database.DeleteMessageFromCache(ctx, sourceID, msgList)
}
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) {
func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList)
}
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) {
func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
return m.database.DelMsgBySeqs(ctx, userID, seqs)
}
func (m *MsgController) GetNewestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
return m.database.GetNewestMsg(ctx, ID)
}
func (m *MsgController) GetOldestMsg(ctx context.Context, ID string) (msg *sdkws.MsgData, err error) {
return m.database.GetOldestMsg(ctx, ID)
}
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetMsgBySeqs(ctx, userID, seqs)
}
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
}
@ -95,66 +87,87 @@ func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error
return m.database.CleanUpUserMsg(ctx, userID)
}
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error {
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime)
}
func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime)
}
func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
}
func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
}
func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return m.database.SetUserMinSeq(ctx, userID, minSeq)
}
type MsgDatabaseInterface interface {
// 批量插入消息
BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error)
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error)
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 获取群ID或者UserID最新一条在mongo里面的消息
GetNewestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 获取群ID或者UserID最老一条在mongo里面的消息
GetOldestMsg(ctx context.Context, sourceID string) (msg *sdkws.MsgData, err error)
// 通过seqList获取mongo中写扩散消息
GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在 mongo里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/redis/mongo然后重置seq
CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
}
type MsgDatabase struct {
msgModel unRelationTb.MsgDocModelInterface
msgCache cache.Cache
msg unRelationTb.MsgDocModel
mgo unRelationTb.MsgDocModelInterface
cache cache.Cache
msg unRelationTb.MsgDocModel
}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{}
}
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error {
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill()
if len(msgList) > db.msg.GetSingleGocMsgNum() {
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
return errors.New("too large")
}
var remain uint64
blk0 := uint64(db.msg.GetSingleGocMsgNum() - 1)
var remain int64
blk0 := db.msg.GetSingleGocMsgNum() - 1
//currentMaxSeq 4998
if currentMaxSeq < uint64(db.msg.GetSingleGocMsgNum()) {
if currentMaxSeq < db.msg.GetSingleGocMsgNum() {
remain = blk0 - currentMaxSeq //1
} else {
excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999
remain = (uint64(db.msg.GetSingleGocMsgNum()) - (excludeBlk0 % uint64(db.msg.GetSingleGocMsgNum()))) % uint64(db.msg.GetSingleGocMsgNum())
remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum()
}
//remain=1
insertCounter := uint64(0)
var insertCounter int64
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
docID := ""
@ -165,18 +178,18 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
currentMaxSeq++
sMsg := unRelationTb.MsgInfoModel{}
sMsg.SendTime = m.MsgData.SendTime
m.MsgData.Seq = uint32(currentMaxSeq)
m.MsgData.Seq = currentMaxSeq
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
return utils.Wrap(err, "")
}
if insertCounter < remain {
msgsToMongo = append(msgsToMongo, sMsg)
insertCounter++
docID = db.msg.GetDocID(sourceID, uint32(currentMaxSeq))
docID = db.msg.GetDocID(sourceID, currentMaxSeq)
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} else {
msgsToMongoNext = append(msgsToMongoNext, sMsg)
docIDNext = db.msg.GetDocID(sourceID, uint32(currentMaxSeq))
docIDNext = db.msg.GetDocID(sourceID, currentMaxSeq)
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
}
}
@ -185,13 +198,13 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
//filter := bson.M{"uid": seqUid}
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
err = db.msgModel.PushMsgsToDoc(ctx, docID, msgsToMongo)
err = db.mgo.PushMsgsToDoc(ctx, docID, msgsToMongo)
if err != nil {
if err == mongo.ErrNoDocuments {
doc := &unRelationTb.MsgDocModel{}
doc.DocID = docID
doc.Msg = msgsToMongo
if err = db.msgModel.Create(ctx, doc); err != nil {
if err = db.mgo.Create(ctx, doc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
@ -211,7 +224,7 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
nextDoc.DocID = docIDNext
nextDoc.Msg = msgsToMongoNext
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
if err = db.msgModel.Create(ctx, nextDoc); err != nil {
if err = db.mgo.Create(ctx, nextDoc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
@ -223,26 +236,26 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
}
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
return db.msgCache.DeleteMessageFromCache(ctx, userID, msgs)
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
}
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (uint64, error) {
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)
if lenList > db.msg.GetSingleGocMsgNum() {
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, errors.New("too large")
}
if lenList < 1 {
return 0, errors.New("too short as 0")
}
// judge sessionType to get seq
var currentMaxSeq uint64
var currentMaxSeq int64
var err error
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
currentMaxSeq, err = db.msgCache.GetGroupMaxSeq(ctx, sourceID)
currentMaxSeq, err = db.cache.GetGroupMaxSeq(ctx, sourceID)
//log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
} else {
currentMaxSeq, err = db.msgCache.GetUserMaxSeq(ctx, sourceID)
currentMaxSeq, err = db.cache.GetUserMaxSeq(ctx, sourceID)
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
}
if err != nil && err != redis.Nil {
@ -253,11 +266,11 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
lastMaxSeq := currentMaxSeq
for _, m := range msgList {
currentMaxSeq++
m.MsgData.Seq = uint32(currentMaxSeq)
m.MsgData.Seq = currentMaxSeq
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", sourceID, "seq: ", currentMaxSeq)
}
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
failedNum, err := db.msgCache.SetMessageToCache(ctx, sourceID, msgList)
failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
if err != nil {
prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum)
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
@ -266,9 +279,9 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
}
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
err = db.msgCache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
err = db.cache.SetGroupMaxSeq(ctx, sourceID, currentMaxSeq)
} else {
err = db.msgCache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
}
if err != nil {
prome.PromeInc(prome.SeqSetFailedCounter)
@ -278,14 +291,14 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
return lastMaxSeq, utils.Wrap(err, "")
}
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (totalUnExistSeqs []uint32, err error) {
sortkeys.Uint32s(seqs)
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(docIDSeqsMap))
for k, v := range docIDSeqsMap {
go func(docID string, seqs []uint32) {
go func(docID string, seqs []int64) {
defer wg.Done()
unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
@ -299,26 +312,26 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []u
return totalUnExistSeqs, nil
}
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (unExistSeqs []uint32, err error) {
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
}
for i, v := range seqMsgs {
if err = db.msgModel.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
if err = db.mgo.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
return nil, err
}
}
return unExistSeqs, nil
}
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []uint32) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []uint32, err error) {
doc, err := db.msgModel.FindOneByDocID(ctx, docID)
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
}
singleCount := 0
var hasSeqList []uint32
var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
@ -344,7 +357,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
}
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgModel.GetNewestMsg(ctx, sourceID)
msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return nil, err
}
@ -352,7 +365,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
}
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgModel.GetOldestMsg(ctx, sourceID)
msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return nil, err
}
@ -368,12 +381,12 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
return msgPb, nil
}
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []uint32, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqs []uint32
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqs []int64
singleCount := 0
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
for docID, value := range m {
doc, err := db.msgModel.FindOneByDocID(ctx, docID)
doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil {
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
continue
@ -396,7 +409,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
}
}
if len(hasSeqs) != len(seqs) {
var diff []uint32
var diff []int64
var exceptionMsg []*sdkws.MsgData
diff = utils.Difference(hasSeqs, seqs)
if diffusionType == constant.WriteDiffusion {
@ -409,8 +422,8 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
return seqMsg, nil
}
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, userID, seqs)
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -430,8 +443,8 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []u
return successMsgs, nil
}
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, groupID, seqs)
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -456,7 +469,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error
if err != nil {
return err
}
err = db.msgCache.CleanUpOneUserAllMsg(ctx, userID)
err = db.cache.CleanUpOneUserAllMsg(ctx, userID)
return utils.Wrap(err, "")
}
@ -471,15 +484,15 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
}
//log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
for _, userID := range userIDs {
userMinSeq, err := db.msgCache.GetGroupUserMinSeq(ctx, groupID, userID)
userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
if err != nil && err != redis.Nil {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
continue
}
if userMinSeq > uint64(minSeq) {
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
if userMinSeq > minSeq {
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
} else {
err = db.msgCache.SetGroupUserMinSeq(ctx, groupID, userID, uint64(minSeq))
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
@ -497,16 +510,16 @@ func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID st
if minSeq == 0 {
return nil
}
return db.msgCache.SetUserMinSeq(ctx, userID, uint64(minSeq))
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}
// this is struct for recursion
type delMsgRecursionStruct struct {
minSeq uint32
minSeq int64
delDocIDList []string
}
func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
return d.minSeq
}
@ -514,9 +527,9 @@ func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (uint32, error) {
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
msgs, err := db.msgModel.GetMsgsByIndex(ctx, sourceID, index)
msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
if err != nil || msgs.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
@ -526,14 +539,14 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
}
}
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList)
err = db.msgModel.Delete(ctx, delStruct.delDocIDList)
err = db.mgo.Delete(ctx, delStruct.delDocIDList)
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
//log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
if len(msgs.Msg) > db.msg.GetSingleGocMsgNum() {
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.NewWarn(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "msgs too large:", len(msgs.Msg), "docID:", msgs.DocID)
}
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
@ -561,11 +574,11 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
msg.SendTime = 0
hasMarkDelFlag = true
} else {
if err := db.msgModel.Delete(ctx, delStruct.delDocIDList); err != nil {
if err := db.mgo.Delete(ctx, delStruct.delDocIDList); err != nil {
return 0, err
}
if hasMarkDelFlag {
if err := db.msgModel.UpdateOneDoc(ctx, msgs); err != nil {
if err := db.mgo.UpdateOneDoc(ctx, msgs); err != nil {
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
}
}
@ -578,3 +591,62 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime)
return seq, utils.Wrap(err, "deleteMsg failed")
}
func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
// from cache
minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
return
}
func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
return
}
func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
}
msgPb, err := db.unmarshalMsg(oldestMsgMongo)
if err != nil {
return 0, 0, err
}
minSeqMongo = msgPb.Seq
newestMsgMongo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
}
msgPb, err = db.unmarshalMsg(newestMsgMongo)
if err != nil {
return 0, 0, err
}
maxSeqMongo = msgPb.Seq
return
}
func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}

View File

@ -67,3 +67,10 @@ func (g *GroupGorm) Search(ctx context.Context, keyword string, pageNumber, show
}()
return gormSearch[relation.GroupModel](getDBConn(g.DB, tx), []string{"name"}, keyword, pageNumber, showNumber)
}
func (g *GroupGorm) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) {
if err := g.DB.Model(&relation.GroupModel{}).Where("group_type = ? ", groupType).Pluck("group_id", &groupIDs).Error; err != nil {
return nil, utils.Wrap(err, "")
}
return groupIDs, nil
}

View File

@ -41,7 +41,7 @@ func (MsgDocModel) TableName() string {
return CChat
}
func (MsgDocModel) GetSingleGocMsgNum() int {
func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum
}
@ -59,36 +59,36 @@ func (m *MsgDocModel) IsFull() bool {
return false
}
func (m MsgDocModel) GetDocID(sourceID string, seq uint32) string {
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.indexGen(sourceID, seqSuffix)
}
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq uint32) []string {
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum
var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, uint32(i))
seqUserID := m.indexGen(userID, int64(i))
seqUserIDs = append(seqUserIDs, seqUserID)
}
return seqUserIDs
}
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq uint32) string {
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.superGroupIndexGen(groupID, seqSuffix)
}
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix uint32) string {
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []uint32) map[string][]uint32 {
t := make(map[string][]uint32)
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i])
if value, ok := t[docID]; !ok {
var temp []uint32
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
@ -108,11 +108,11 @@ func (m MsgDocModel) getMsgIndex(seq uint32) int {
return int(index)
}
func (m MsgDocModel) indexGen(sourceID string, seqSuffix uint32) string {
return sourceID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
}
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []uint32) (exceptionMsg []*sdkws.MsgData) {
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
@ -121,7 +121,7 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []uint32) (exceptionMsg []*sdk
return exceptionMsg
}
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) {
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v

View File

@ -9,7 +9,7 @@ import (
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
)
type Producer struct {

View File

@ -239,17 +239,6 @@ func NewWarn(OperationID string, args ...interface{}) {
func ShowLog(ctx context.Context) {
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
OperationID := tracelog.GetOperationID(ctx)
//if ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo).GinCtx != nil {
// ctxLogger.WithFields(logrus.Fields{
// "OperationID": OperationID,
// "PID": ctxLogger.Pid,
// }).Infoln("api: ", t.ApiName)
//} else {
// ctxLogger.WithFields(logrus.Fields{
// "OperationID": OperationID,
// "PID": ctxLogger.Pid,
// }).Infoln("rpc: ", t.ApiName)
//}
for _, v := range *t.Funcs {
if v.Err != nil {

View File

@ -223,6 +223,13 @@ message MsgDataToModifyByMQ{
string triggerID = 3;
}
message DelMsgListReq{
string userID = 2;
repeated uint32 seqList = 3;
}
message DelMsgListResp{
}
service msg {
//seq
@ -232,7 +239,7 @@ service msg {
//
rpc SendMsg(SendMsgReq) returns(SendMsgResp);
//
rpc DelMsgList(sdkws.DelMsgListReq) returns(sdkws.DelMsgListResp);
rpc DelMsgList(DelMsgListReq) returns(DelMsgListResp);
//
rpc DelSuperGroupMsg(DelSuperGroupMsgReq) returns(DelSuperGroupMsgResp);
//

File diff suppressed because it is too large Load Diff

View File

@ -132,12 +132,12 @@ message FriendRequest{
///////////////////////////////////base end/////////////////////////////////////
message PullMessageBySeqListReq{
string userID = 1;
repeated uint32 seqList = 3;
repeated int64 seqs = 3;
map <string, seqList>groupSeqList = 4;
}
message seqList {
repeated uint32 seqList = 1;
repeated int64 seqs = 1;
}
@ -159,12 +159,12 @@ message GetMaxAndMinSeqReq {
string userID = 2;
}
message MaxAndMinSeq{
uint32 maxSeq = 1;
uint32 minSeq = 2;
int64 maxSeq = 1;
int64 minSeq = 2;
}
message GetMaxAndMinSeqResp {
uint32 maxSeq = 1;
uint32 minSeq = 2;
int64 maxSeq = 1;
int64 minSeq = 2;
map<string, MaxAndMinSeq> groupMaxAndMinSeq = 5;
}
@ -187,7 +187,7 @@ message MsgData {
int32 msgFrom = 10;
int32 contentType = 11;
bytes content = 12;
uint32 seq = 14;
int64 seq = 14;
int64 sendTime = 15;
int64 createTime = 16;
int32 status = 17;
@ -426,7 +426,7 @@ message ConversationSetPrivateTips{
message DeleteMessageTips{
string opUserID = 1;
string userID = 2;
repeated uint32 seqList = 3;
repeated int64 seqs = 3;
}
///cms
message RequestPagination {
@ -601,13 +601,7 @@ message SignalGetTokenByRoomIDReply {
}
message DelMsgListReq{
string userID = 2;
repeated uint32 seqList = 3;
}
message DelMsgListResp{
}
message SetAppBackgroundStatusReq {
string userID = 1;
@ -642,9 +636,3 @@ message KeyValue {
int64 latestUpdateTime = 3;
}
message ResponsePagination {
int32 CurrentPage = 5;
int32 ShowNumber = 6;
}

View File

@ -57,9 +57,9 @@ func cleanUpFuncName(funcName string) string {
}
// Get the intersection of two slices
func Intersect(slice1, slice2 []uint32) []uint32 {
m := make(map[uint32]bool)
n := make([]uint32, 0)
func Intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]bool)
n := make([]int64, 0)
for _, v := range slice1 {
m[v] = true
}
@ -73,9 +73,9 @@ func Intersect(slice1, slice2 []uint32) []uint32 {
}
// Get the diff of two slices
func Difference(slice1, slice2 []uint32) []uint32 {
m := make(map[uint32]bool)
n := make([]uint32, 0)
func Difference(slice1, slice2 []int64) []int64 {
m := make(map[int64]bool)
n := make([]int64, 0)
inter := Intersect(slice1, slice2)
for _, v := range inter {
m[v] = true