Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

This commit is contained in:
Gordon 2022-12-01 14:55:42 +08:00
commit 6f69653a8b
11 changed files with 165 additions and 32 deletions

View File

@ -164,6 +164,21 @@ func msgListIsFull(chat *db.UserChat) bool {
return false
}
func CheckGroupUserMinSeq(operationID, groupID, userID string, diffusionType int) error {
return nil
}
func CheckUserMinSeqWithMongo(operationID, userID string, diffusionType int) error {
//var seqRedis uint64
//var err error
//if diffusionType == constant.WriteDiffusion {
// seqRedis, err = db.DB.GetUserMinSeq(ID)
//} else {
// seqRedis, err = db.DB.GetGroupUserMinSeq(ID)
//}
return nil
}
func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error {
var seqRedis uint64
var err error
@ -185,17 +200,10 @@ func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error {
if msg == nil {
return nil
}
var seqMongo uint32
msgPb := &server_api_params.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil {
return utils.Wrap(err, "")
}
seqMongo = msgPb.Seq
if math.Abs(float64(seqMongo-uint32(seqRedis))) > 10 {
log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msgPb.Status, msg.SendTime)
if math.Abs(float64(msg.Seq-uint32(seqRedis))) > 10 {
log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msg.Status, msg.SendTime)
} else {
log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "seq and msg OK", "status:", msgPb.Status, msg.SendTime)
log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "seq and msg OK", "status:", msg.Status, msg.SendTime)
}
return nil
}

View File

@ -30,11 +30,8 @@ func StartCronTask(userID, workingGroupID string) {
fmt.Println("clear msg finished")
return
}
clearFunc := func() {
ClearAll()
}
c := cron.New()
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearFunc)
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll)
if err != nil {
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
panic(err)
@ -53,7 +50,6 @@ func getCronTaskOperationID() string {
func ClearAll() {
operationID := getCronTaskOperationID()
log.NewInfo(operationID, "====================== start del cron task ======================")
//var userIDList []string
var err error
userIDList, err := im_mysql_model.SelectAllUserID()
if err == nil {
@ -61,7 +57,6 @@ func ClearAll() {
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
//return
// working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup)
if err == nil {
@ -82,6 +77,9 @@ func StartClearMsg(operationID string, userIDList []string) {
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
if err := CheckUserMinSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
}
}
@ -100,5 +98,10 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string)
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
for _, userID := range userIDList {
if err := CheckGroupUserMinSeq(operationID, groupID, userID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
}
}
}

View File

@ -15,10 +15,12 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"runtime"
"strings"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
@ -150,7 +152,8 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
return
}
msgClient := pbChat.NewMsgClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq)
maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq, maxSizeOption)
if err != nil {
log.NewError(rpcReq.OperationID, "pullMsgBySeqListReq err", err.Error())
nReply.ErrCode = 200
@ -403,7 +406,7 @@ func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
if isPass {
req := pData.(*sdk_ws.SetAppBackgroundStatusReq)
conn.IsBackground = req.IsBackground
callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.platformID), conn.token, req.IsBackground)
callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.platformID), conn.token, conn.IsBackground)
if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}

View File

@ -70,7 +70,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
wsResult = append(wsResult, reply.SinglePushResult...)
}
}
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData, "isOfflinePush", isOfflinePush)
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
// save invitation info for offline push

View File

@ -8,6 +8,7 @@ import (
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"github.com/go-redis/redis/v8"
)

View File

@ -0,0 +1 @@
package msg

View File

@ -10,11 +10,12 @@ import (
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"github.com/golang/protobuf/proto"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
@ -94,8 +95,12 @@ func (rpc *rpcChat) Run() {
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.Info("", "listen network success, address ", address)
var grpcOpts []grpc.ServerOption
recvSize := 1024 * 1024 * 30
sendSize := 1024 * 1024 * 30
var grpcOpts = []grpc.ServerOption{
grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize),
}
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()

View File

@ -107,6 +107,17 @@ func userIsMuteInGroup(groupID, userID string) (bool, error) {
return false, nil
}
func groupIsMuted(groupID string) (bool, error) {
groupInfo, err := rocksCache.GetGroupInfoFromCache(groupID)
if err != nil {
return false, utils.Wrap(err, "GetGroupInfoFromCache failed")
}
if groupInfo.Status == constant.GroupStatusMuted {
return true, nil
}
return false, nil
}
func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string) {
switch data.MsgData.SessionType {
case constant.SingleChatType:
@ -182,7 +193,15 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
return false, 202, "you are not in group", nil
}
}
isMute, err := userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID)
isMute, err := groupIsMuted(data.MsgData.GroupID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
}
if isMute {
return false, 225, "group id muted", nil
}
isMute, err = userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
@ -245,7 +264,15 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
return false, 202, "you are not in group", nil
}
}
isMute, err := userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID)
isMute, err := groupIsMuted(data.MsgData.GroupID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
}
if isMute {
return false, 225, "group id muted", nil
}
isMute, err = userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil

View File

@ -0,0 +1,49 @@
package db
type ExtendMsgSet struct {
ID string `bson:"id" json:"ID"`
ExtendMsg []*ExtendMsg `bson:"extend_msg" json:"extendMsg"`
LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"`
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"`
CreateTime int32 `bson:"create_time" json:"createTime"`
}
type ExtendMsg struct {
SendID string `bson:"send_id" json:"sendID"`
ServerMsgID string `bson:"server_msg_id" json:"serverMsgID"`
Ex string `bson:"ex" json:"ex"`
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
LikeUserIDList []string `bson:"like_user_id_list" json:"likeUserIDList"`
Content string `bson:"content" json:"content"`
ExtendMsgComments []*ExtendMsgComment `bson:"extend_msg_comments" json:"extendMsgComment"`
Vote *Vote `bson:"vote" json:"vote"`
Urls []string `bson:"urls" json:"urls"`
CreateTime int32 `bson:"create_time" json:"createTime"`
}
type Vote struct {
Content string `bson:"content" json:"content"`
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
Options []*Options `bson:"options" json:"options"`
}
type Options struct {
Content string `bson:"content" json:"content"`
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
VoteUserIDList []string `bson:"vote_user_id_list" json:"voteUserIDList"`
}
type ExtendMsgComment struct {
UserID string `bson:"user_id" json:"userID"`
ReplyUserID string `bson:"reply_user_id" json:"replyUserID"`
ReplyContentID string `bson:"reply_content_id" json:"replyContentID"`
ContentID string `bson:"content_id" json:"contentID"`
Content string `bson:"content" json:"content"`
CreateTime int32 `bson:"create_time" json:"createTime"`
AttachedInfo string `bson:"attached_info" json:"attachedInfo"`
Ex string `bson:"ex" json:"ex"`
}

View File

@ -318,7 +318,7 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error {
return err
}
func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) {
func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@ -334,13 +334,53 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) {
}
if len(userChats) > 0 {
if len(userChats[0].Msg) > 0 {
return &userChats[0].Msg[len(userChats[0].Msg)-1], nil
msgPb := &open_im_sdk.MsgData{}
err = proto.Unmarshal(userChats[0].Msg[len(userChats[0].Msg)-1].Msg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
}
return msgPb, nil
}
return nil, errors.New("len(userChats[0].Msg) < 0")
}
return nil, nil
}
func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
findOpts := options.Find().SetLimit(1).SetSort(bson.M{"uid": 1})
var userChats []UserChat
cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts)
if err != nil {
return nil, err
}
err = cursor.All(ctx, &userChats)
if err != nil {
return nil, utils.Wrap(err, "")
}
var oldestMsg []byte
if len(userChats) > 0 {
for _, v := range userChats[0].Msg {
if v.SendTime != 0 {
oldestMsg = v.Msg
break
}
}
if len(oldestMsg) == 0 {
oldestMsg = userChats[0].Msg[len(userChats[0].Msg)-1].Msg
}
msgPb := &open_im_sdk.MsgData{}
err = proto.Unmarshal(oldestMsg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
}
return msgPb, nil
}
return nil, nil
}
func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0

View File

@ -8,7 +8,6 @@ import (
"Open_IM/pkg/utils"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"sort"
@ -408,9 +407,6 @@ func GetJoinedSuperGroupListFromCache(userID string) ([]string, error) {
if err != nil {
return "", utils.Wrap(err, "")
}
if len(userToSuperGroup.GroupIDList) == 0 {
return "", errors.New("GroupIDList == 0")
}
bytes, err := json.Marshal(userToSuperGroup.GroupIDList)
if err != nil {
return "", utils.Wrap(err, "")