Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

This commit is contained in:
skiffer-git 2022-12-05 17:10:15 +08:00
commit 9394d340a2
39 changed files with 4119 additions and 5660 deletions

View File

@ -2,11 +2,15 @@ package main
import (
"Open_IM/internal/cron_task"
"flag"
"fmt"
"time"
)
func main() {
fmt.Println(time.Now(), "start cronTask")
cronTask.StartCronTask()
var userID = flag.String("userID", "", "userID to clear msg and reset seq")
var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq")
flag.Parse()
fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID)
cronTask.StartCronTask(*userID, *workingGroupID)
}

View File

@ -58,6 +58,40 @@ func init() {
}
}
func GetUserToken(c *gin.Context) {
var (
req apiStruct.GetUserTokenRequest
resp apiStruct.GetUserTokenResponse
reqPb pbAdmin.GetUserTokenReq
respPb *pbAdmin.GetUserTokenResp
)
if err := c.BindJSON(&req); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
reqPb.OperationID = req.OperationID
reqPb.UserID = req.UserID
reqPb.PlatformID = req.PlatFormID
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImAdminCMSName, reqPb.OperationID)
if etcdConn == nil {
errMsg := reqPb.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(reqPb.OperationID, errMsg)
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
return
}
client := pbAdmin.NewAdminCMSClient(etcdConn)
respPb, err := client.GetUserToken(context.Background(), &reqPb)
if err != nil {
log.NewError(reqPb.OperationID, utils.GetSelfFuncName(), "rpc failed", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()})
return
}
resp.Token = respPb.Token
resp.ExpTime = respPb.ExpTime
c.JSON(http.StatusOK, gin.H{"errCode": respPb.CommonResp.ErrCode, "errMsg": respPb.CommonResp.ErrMsg, "data": resp})
}
// register
func AdminLogin(c *gin.Context) {
var (

View File

@ -1,6 +1,7 @@
package middleware
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/utils"
@ -20,6 +21,11 @@ func JWTAuth() gin.HandlerFunc {
c.JSON(http.StatusOK, gin.H{"errCode": 400, "errMsg": errInfo})
return
} else {
if !utils.IsContain(userID, config.Config.Manager.AppManagerUid) {
c.Abort()
c.JSON(http.StatusOK, gin.H{"errCode": 400, "errMsg": "user is not admin"})
return
}
log.NewInfo("0", utils.GetSelfFuncName(), "failed: ", errInfo)
}
}

View File

@ -29,6 +29,8 @@ func NewGinRouter() *gin.Engine {
{
adminRouterGroup.POST("/login", admin.AdminLogin)
adminRouterGroup.Use(middleware.JWTAuth())
adminRouterGroup.POST("/get_user_token", admin.GetUserToken)
adminRouterGroup.POST("/add_user_register_add_friend_id", admin.AddUserRegisterAddFriendIDList)
adminRouterGroup.POST("/reduce_user_register_reduce_friend_id", admin.ReduceUserRegisterAddFriendIDList)
adminRouterGroup.POST("/get_user_register_reduce_friend_id_list", admin.GetUserRegisterAddFriendIDList)

View File

@ -54,7 +54,7 @@ func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error {
if minSeq == 0 {
return nil
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", delStruct, "minSeq", minSeq)
log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDStruct: ", delStruct, "minSeq", minSeq)
err = db.DB.SetUserMinSeq(userID, minSeq)
return utils.Wrap(err, "")
}
@ -82,7 +82,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() uint32 {
// index 0....19(del) 20...69
// seq 70
// set minSeq 21
// recursion
// recursion 删除list并且返回设置的最小seq
func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMsgRecursionStruct) (uint32, error) {
// find from oldest list
msgs, err := db.DB.GetUserMsgListByIndex(ID, index)
@ -105,11 +105,13 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
if len(msgs.Msg) > db.GetSingleGocMsgNum() {
log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID)
}
var hasMsgDoNotNeedDel bool
for i, msg := range msgs.Msg {
// 找到列表中不需要删除的消息了, 表示为递归到最后一个块
if utils.GetCurrentTimestampByMill() < msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) {
log.NewDebug(operationID, ID, "find uid", msgs.UID)
// 删除块失败 递归结束 返回0
hasMsgDoNotNeedDel = true
if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil {
return 0, err
}
@ -120,7 +122,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
}
// 如果不是块中第一个,就把前面比他早插入的全部设置空 seq字段除外。
if i > 0 {
err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1)
delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i)
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
@ -128,6 +130,10 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
}
// 递归结束
return msgPb.Seq, nil
} else {
if !msgListIsFull(msgs) {
}
}
}
// 该列表中消息全部为老消息并且列表满了, 加入删除列表继续递归
@ -139,15 +145,26 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs
}
delStruct.minSeq = lastMsgPb.Seq
if msgListIsFull(msgs) {
log.NewDebug(operationID, "msg list is full", msgs.UID)
delStruct.delUidList = append(delStruct.delUidList, msgs.UID)
} else {
// 列表没有满且没有不需要被删除的消息 代表他是最新的消息块
if !hasMsgDoNotNeedDel {
delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, len(msgs.Msg)-1)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, "Index:", len(msgs.Msg)-1)
err = delMongoMsgsPhysical(delStruct.delUidList)
if err != nil {
return delStruct.getSetMinSeq(), err
}
return delStruct.getSetMinSeq(), nil
}
}
}
log.NewDebug(operationID, ID, "continue", delStruct)
log.NewDebug(operationID, ID, "continue to", delStruct)
// 继续递归 index+1
seq, err := deleteMongoMsg(operationID, ID, index+1, delStruct)
if err != nil {
return seq, utils.Wrap(err, "deleteMongoMsg failed")
}
return seq, nil
return seq, utils.Wrap(err, "deleteMongoMsg failed")
}
func msgListIsFull(chat *db.UserChat) bool {
@ -163,6 +180,14 @@ func msgListIsFull(chat *db.UserChat) bool {
return false
}
func CheckGroupUserMinSeq(operationID, groupID, userID string) error {
return nil
}
func CheckUserMinSeqWithMongo(operationID, userID string) error {
return nil
}
func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error {
var seqRedis uint64
var err error
@ -184,17 +209,10 @@ func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error {
if msg == nil {
return nil
}
var seqMongo uint32
msgPb := &server_api_params.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil {
return utils.Wrap(err, "")
}
seqMongo = msgPb.Seq
if math.Abs(float64(seqMongo-uint32(seqRedis))) > 10 {
log.NewWarn(operationID, utils.GetSelfFuncName(), seqMongo, seqRedis, "redis maxSeq is different with msg.Seq > 10", ID, diffusionType)
if math.Abs(float64(msg.Seq-uint32(seqRedis))) > 10 {
log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msg.Status, msg.SendTime)
} else {
log.NewInfo(operationID, utils.GetSelfFuncName(), diffusionType, ID, "seq and msg OK", seqMongo, seqRedis)
log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, ID, "seq and msg OK", "status:", msg.Status, msg.SendTime)
}
return nil
}

View File

@ -14,60 +14,28 @@ import (
const cronTaskOperationID = "cronTaskOperationID-"
func StartCronTask() {
func StartCronTask(userID, workingGroupID string) {
log.NewPrivateLog("cron")
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
c := cron.New()
fmt.Println("cron config", config.Config.Mongo.ChatRecordsClearTime)
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, func() {
// user msg clear
if userID != "" {
operationID := getCronTaskOperationID()
log.NewInfo(operationID, "====================== start del cron task ======================")
userIDList, err := im_mysql_model.SelectAllUserID()
if err == nil {
log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList)
userIDList = []string{"4158779020"}
for _, userID := range userIDList {
if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID)
}
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
}
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
//return
// working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup)
if err == nil {
log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList)
for _, groupID := range workingGroupIDList {
userIDList, err = rocksCache.GetGroupMemberIDListFromCache(groupID)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID)
continue
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList)
if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
}
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
log.NewInfo(operationID, "====================== start del cron finished ======================")
})
StartClearMsg(operationID, []string{userID})
}
if workingGroupID != "" {
operationID := getCronTaskOperationID()
StartClearWorkingGroupMsg(operationID, []string{workingGroupID})
}
if userID != "" || workingGroupID != "" {
fmt.Println("clear msg finished")
return
}
c := cron.New()
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll)
if err != nil {
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
panic(err)
}
c.Start()
fmt.Println("start cron task success")
for {
@ -78,3 +46,62 @@ func StartCronTask() {
func getCronTaskOperationID() string {
return cronTaskOperationID + utils.OperationIDGenerator()
}
func ClearAll() {
operationID := getCronTaskOperationID()
log.NewInfo(operationID, "====================== start del cron task ======================")
var err error
userIDList, err := im_mysql_model.SelectAllUserID()
if err == nil {
StartClearMsg(operationID, userIDList)
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
// working group msg clear
workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup)
if err == nil {
StartClearWorkingGroupMsg(operationID, workingGroupIDList)
} else {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
log.NewInfo(operationID, "====================== start del cron finished ======================")
}
func StartClearMsg(operationID string, userIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList)
for _, userID := range userIDList {
if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID)
}
if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
if err := CheckUserMinSeqWithMongo(operationID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), userID, err)
}
}
}
func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) {
log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList)
for _, groupID := range workingGroupIDList {
userIDList, err := rocksCache.GetGroupMemberIDListFromCache(groupID)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID)
continue
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "workingGroupIDList:", userIDList)
if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
for _, userID := range userIDList {
if err := CheckGroupUserMinSeq(operationID, groupID, userID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), groupID, err)
}
}
}
}

View File

@ -9,7 +9,7 @@ import (
"time"
)
func callbackUserOnline(operationID, userID string, platformID int, token string) cbApi.CommonCallbackResp {
func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackgroundStatusChanged bool) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserOnline.Enable {
return callbackResp
@ -25,7 +25,8 @@ func callbackUserOnline(operationID, userID string, platformID int, token string
},
UserID: userID,
},
Seq: int(time.Now().UnixNano() / 1e6),
Seq: int(time.Now().UnixNano() / 1e6),
IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged,
}
callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOnlineCommand, callbackUserOnlineReq, callbackUserOnlineResp, config.Config.Callback.CallbackUserOnline.CallbackTimeOut); err != nil {
@ -35,7 +36,7 @@ func callbackUserOnline(operationID, userID string, platformID int, token string
return callbackResp
}
func callbackUserOffline(operationID, userID string, platformID int) cbApi.CommonCallbackResp {
func callbackUserOffline(operationID, userID string, platformID int, isAppBackgroundStatusChanged bool) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserOffline.Enable {
return callbackResp
@ -50,7 +51,8 @@ func callbackUserOffline(operationID, userID string, platformID int) cbApi.Commo
},
UserID: userID,
},
Seq: int(time.Now().UnixNano() / 1e6),
Seq: int(time.Now().UnixNano() / 1e6),
IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged,
}
callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOfflineCommand, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil {

View File

@ -15,10 +15,12 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"runtime"
"strings"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
@ -65,6 +67,9 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
case constant.WsLogoutMsg:
log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.userLogoutReq(conn, &m)
case constant.WsSetBackgroundStatus:
log.NewInfo(m.OperationID, "WsSetBackgroundStatus", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.setUserDeviceBackground(conn, &m)
default:
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
}
@ -147,7 +152,8 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
return
}
msgClient := pbChat.NewMsgClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq)
maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq, maxSizeOption)
if err != nil {
log.NewError(rpcReq.OperationID, "pullMsgBySeqListReq err", err.Error())
nReply.ErrCode = 200
@ -394,3 +400,36 @@ func SetTokenKicked(userID string, platformID int, operationID string) {
return
}
}
func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WsSetBackgroundStatus, m.OperationID)
if isPass {
req := pData.(*sdk_ws.SetAppBackgroundStatusReq)
conn.IsBackground = req.IsBackground
if !conn.IsBackground {
callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, true)
if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}
} else {
callbackResp := callbackUserOffline(m.OperationID, conn.userID, int(conn.PlatformID), true)
if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}
}
log.NewInfo(m.OperationID, "SetUserDeviceBackground", "success", *conn, req.IsBackground)
}
ws.setUserDeviceBackgroundResp(conn, m, errCode, errMsg)
}
func (ws *WServer) setUserDeviceBackgroundResp(conn *UserConn, m *Req, errCode int32, errMsg string) {
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
OperationID: m.OperationID,
ErrCode: errCode,
ErrMsg: errMsg,
}
ws.sendMsg(conn, mReply)
}

View File

@ -196,24 +196,27 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode,
RecvID: v,
RecvPlatFormID: int32(platform),
temp := &pbRelay.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(platform),
}
if !userConn.IsBackground {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
} else {
temp.ResultCode = -2
resp = append(resp, temp)
}
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{

View File

@ -107,6 +107,18 @@ func (ws *WServer) argsValidate(m *Req, r int32, operationID string) (isPass boo
}
return true, 0, "", data
case constant.WsSetBackgroundStatus:
data := open_im_sdk.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", &data
default:
}
return false, 204, "args err", nil

View File

@ -31,11 +31,14 @@ import (
type UserConn struct {
*websocket.Conn
w *sync.Mutex
platformID int32
PlatformID int32
PushedMaxSeq uint32
IsCompress bool
userID string
IsBackground bool
token string
}
type WServer struct {
wsAddr string
wsMaxConnNum int
@ -73,18 +76,13 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
operationID = utils.OperationIDGenerator()
}
log.Debug(operationID, utils.GetSelfFuncName(), " args: ", query)
if ws.headerCheck(w, r, operationID) {
if isPass, compression := ws.headerCheck(w, r, operationID); isPass {
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
if err != nil {
log.Error(operationID, "upgrade http conn err", err.Error(), query)
return
} else {
var isCompress = false
if r.Header.Get("compression") == "gzip" {
log.NewDebug(operationID, query["sendID"][0], "enable compression")
isCompress = true
}
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, isCompress, query["sendID"][0]}
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0]}
userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
go ws.readMsg(newConn)
@ -221,7 +219,7 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int,
return
}
err = oldConn.Close()
delete(oldConnMap, platformID)
//delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid)
@ -325,7 +323,7 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token
rwLock.Lock()
defer rwLock.Unlock()
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String())
callbackResp := callbackUserOnline(operationID, uid, platformID, token)
callbackResp := callbackUserOnline(operationID, uid, platformID, token, false)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)
}
@ -363,7 +361,7 @@ func (ws *WServer) delUserConn(conn *UserConn) {
operationID := utils.OperationIDGenerator()
var uid string
var platform int
if oldStringMap, ok := ws.wsConnToUser[conn]; ok {
if oldStringMap, okg := ws.wsConnToUser[conn]; okg {
for k, v := range oldStringMap {
platform = k
uid = v
@ -383,17 +381,17 @@ func (ws *WServer) delUserConn(conn *UserConn) {
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
}
delete(ws.wsConnToUser, conn)
}
err := conn.Close()
if err != nil {
log.Error(operationID, " close err", "", "uid", uid, "platform", platform)
}
callbackResp := callbackUserOffline(operationID, uid, platform)
callbackResp := callbackUserOffline(operationID, conn.userID, platform, false)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
}
promePkg.PromeGaugeDec(promePkg.OnlineUserGauge)
}
func (ws *WServer) getUserConn(uid string, platform int) *UserConn {
@ -432,7 +430,7 @@ func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn {
// }
// return "", 0
//}
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool {
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) (isPass, compression bool) {
status := http.StatusUnauthorized
query := r.URL.Query()
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
@ -484,10 +482,16 @@ func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operation
w.Header().Set("Sec-Websocket-Version", "13")
w.Header().Set("ws_err_msg", err.Error())
http.Error(w, err.Error(), status)
return false
return false, false
} else {
log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0])
return true
if r.Header.Get("compression") == "gzip" {
compression = true
}
if len(query["compression"]) != 0 && query["compression"][0] == "gzip" {
compression = true
}
log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0], "compression", compression)
return true, compression
}
} else {
status = int(constant.ErrArgs.ErrCode)
@ -496,6 +500,6 @@ func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operation
errMsg := "args err, need token, sendID, platformID"
w.Header().Set("ws_err_msg", errMsg)
http.Error(w, errMsg, status)
return false
return false, false
}
}

View File

@ -125,6 +125,7 @@ func callbackBeforeSuperGroupOnlinePush(operationID string, groupID string, msg
SessionType: msg.SessionType,
AtUserIDList: msg.AtUserIDList,
Content: callback.GetContent(msg),
Seq: msg.Seq,
}
resp := &cbApi.CallbackBeforeSuperGroupOnlinePushResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackSuperGroupOnlinePushCommand, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.CallbackTimeOut); err != nil {

View File

@ -70,7 +70,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
wsResult = append(wsResult, reply.SinglePushResult...)
}
}
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData, "isOfflinePush", isOfflinePush)
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
// save invitation info for offline push

View File

@ -133,6 +133,24 @@ func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLogi
return resp, nil
}
func (s *adminCMSServer) GetUserToken(_ context.Context, req *pbAdminCMS.GetUserTokenReq) (*pbAdminCMS.GetUserTokenResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbAdminCMS.GetUserTokenResp{
CommonResp: &pbAdminCMS.CommonResp{},
}
token, expTime, err := token_verify.CreateToken(req.UserID, int(req.PlatformID))
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "generate token failed", "userID: ", req.UserID, req.PlatformID, err.Error())
resp.CommonResp.ErrCode = constant.ErrTokenUnknown.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.Token = token
resp.ExpTime = expTime
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", resp.String())
return resp, nil
}
func (s *adminCMSServer) AddUserRegisterAddFriendIDList(_ context.Context, req *pbAdminCMS.AddUserRegisterAddFriendIDListReq) (*pbAdminCMS.AddUserRegisterAddFriendIDListResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbAdminCMS.AddUserRegisterAddFriendIDListResp{CommonResp: &pbAdminCMS.CommonResp{}}
@ -197,37 +215,50 @@ func (s *adminCMSServer) GetUserRegisterAddFriendIDList(_ context.Context, req *
func (s *adminCMSServer) GetChatLogs(_ context.Context, req *pbAdminCMS.GetChatLogsReq) (*pbAdminCMS.GetChatLogsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "GetChatLogs", req.String())
resp := &pbAdminCMS.GetChatLogsResp{CommonResp: &pbAdminCMS.CommonResp{}, Pagination: &server_api_params.ResponsePagination{}}
time, err := utils.TimeStringToTime(req.SendTime)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "time string parse error", err.Error())
resp.CommonResp.ErrCode = constant.ErrArgs.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
chatLog := db.ChatLog{
Content: req.Content,
SendTime: time,
ContentType: req.ContentType,
SessionType: req.SessionType,
RecvID: req.RecvID,
SendID: req.SendID,
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "chat_log: ", chatLog)
nums, err := imdb.GetChatLogCount(chatLog)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLogCount", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
if req.SendTime != "" {
sendTime, err := utils.TimeStringToTime(req.SendTime)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "time string parse error", err.Error())
resp.CommonResp.ErrCode = constant.ErrArgs.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
chatLog.SendTime = sendTime
}
resp.ChatLogsNum = int32(nums)
chatLogs, err := imdb.GetChatLog(chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber)
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "chat_log: ", chatLog)
num, chatLogs, err := imdb.GetChatLog(&chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber, []int32{
constant.Text,
constant.Picture,
constant.Voice,
constant.Video,
constant.File,
constant.AtText,
constant.Merger,
constant.Card,
constant.Location,
constant.Custom,
constant.Revoke,
constant.Quote,
constant.AdvancedText,
constant.AdvancedRevoke,
constant.CustomNotTriggerConversation,
})
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLog", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.ChatLogsNum = int32(num)
for _, chatLog := range chatLogs {
pbChatLog := &pbAdminCMS.ChatLog{}
utils.CopyStructFields(pbChatLog, chatLog)
@ -265,7 +296,6 @@ func (s *adminCMSServer) GetChatLogs(_ context.Context, req *pbAdminCMS.GetChatL
CurrentPage: req.Pagination.PageNumber,
ShowNumber: req.Pagination.ShowNumber,
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp output: ", resp.String())
return resp, nil
}

View File

@ -50,6 +50,11 @@ func (rpc *rpcAuth) UserRegister(_ context.Context, req *pbAuth.UserRegisterReq)
func (rpc *rpcAuth) UserToken(_ context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String())
_, err := imdb.GetUserByUserID(req.FromUserID)
if err != nil {
log.NewError(req.OperationID, "not this user:", req.FromUserID, req.String())
return &pbAuth.UserTokenResp{CommonResp: &pbAuth.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}}, nil
}
tokens, expTime, err := token_verify.CreateToken(req.FromUserID, int(req.Platform))
if err != nil {
errMsg := req.OperationID + " token_verify.CreateToken failed " + err.Error() + req.FromUserID + utils.Int32ToString(req.Platform)

View File

@ -20,6 +20,7 @@ func callbackBeforeCreateGroup(req *pbGroup.CreateGroupReq) cbApi.CommonCallback
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), req.String())
commonCallbackReq := &cbApi.CallbackBeforeCreateGroupReq{
CallbackCommand: constant.CallbackBeforeCreateGroupCommand,
OperationID: req.OperationID,
GroupInfo: *req.GroupInfo,
InitMemberList: req.InitMemberList,
}
@ -88,6 +89,7 @@ func CallbackBeforeMemberJoinGroup(operationID string, groupMember *db.GroupMemb
log.NewDebug(operationID, "args: ", *groupMember)
callbackReq := cbApi.CallbackBeforeMemberJoinGroupReq{
CallbackCommand: constant.CallbackBeforeMemberJoinGroupCommand,
OperationID: operationID,
GroupID: groupMember.GroupID,
UserID: groupMember.UserID,
Ex: groupMember.Ex,

View File

@ -178,13 +178,13 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
utils.CopyStructFields(&groupMember, us)
callbackResp := CallbackBeforeMemberJoinGroup(req.OperationID, &groupMember, groupInfo.Ex)
if callbackResp.ErrCode != 0 {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
log.NewError(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup result", "end rpc and return", callbackResp)
return &pbGroup.CreateGroupResp{
ErrCode: int32(callbackResp.ErrCode),
ErrMsg: callbackResp.ErrMsg,
@ -450,66 +450,6 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
}
resp.Id2ResultList = append(resp.Id2ResultList, &resultNode)
}
var haveConUserID []string
conversations, err := imdb.GetConversationsByConversationIDMultipleOwner(okUserIDList, utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType))
if err != nil {
log.NewError(req.OperationID, "GetConversationsByConversationIDMultipleOwner failed ", err.Error(), req.GroupID, constant.GroupChatType)
}
for _, v := range conversations {
haveConUserID = append(haveConUserID, v.OwnerUserID)
}
var reqPb pbUser.SetConversationReq
var c pbConversation.Conversation
for _, v := range conversations {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v.OwnerUserID
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType)
c.RecvMsgOpt = v.RecvMsgOpt
c.ConversationType = constant.GroupChatType
c.GroupID = req.GroupID
c.IsPinned = v.IsPinned
c.AttachedInfo = v.AttachedInfo
c.IsPrivateChat = v.IsPrivateChat
c.GroupAtType = v.GroupAtType
c.IsNotInGroup = false
c.Ex = v.Ex
reqPb.Conversation = &c
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error(), v.OwnerUserID)
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "SetConversation success", respPb.String(), v.OwnerUserID)
}
}
for _, v := range utils.DifferenceString(haveConUserID, okUserIDList) {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType)
c.ConversationType = constant.GroupChatType
c.GroupID = req.GroupID
c.IsNotInGroup = false
reqPb.Conversation = &c
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error(), v)
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "SetConversation success", respPb.String(), v)
}
}
} else {
okUserIDList = req.InvitedUserIDList
if err := db.DB.AddUserToSuperGroup(req.GroupID, req.InvitedUserIDList); err != nil {
@ -517,6 +457,76 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil
}
}
// set conversations
var haveConUserID []string
var sessionType int
if groupInfo.GroupType == constant.NormalGroup {
sessionType = constant.GroupChatType
} else {
sessionType = constant.SuperGroupChatType
}
conversations, err := imdb.GetConversationsByConversationIDMultipleOwner(okUserIDList, utils.GetConversationIDBySessionType(req.GroupID, sessionType))
if err != nil {
log.NewError(req.OperationID, "GetConversationsByConversationIDMultipleOwner failed ", err.Error(), req.GroupID, sessionType)
}
for _, v := range conversations {
haveConUserID = append(haveConUserID, v.OwnerUserID)
}
var reqPb pbUser.SetConversationReq
var c pbConversation.Conversation
for _, v := range conversations {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v.OwnerUserID
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, sessionType)
c.RecvMsgOpt = v.RecvMsgOpt
c.ConversationType = int32(sessionType)
c.GroupID = req.GroupID
c.IsPinned = v.IsPinned
c.AttachedInfo = v.AttachedInfo
c.IsPrivateChat = v.IsPrivateChat
c.GroupAtType = v.GroupAtType
c.IsNotInGroup = false
c.Ex = v.Ex
reqPb.Conversation = &c
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error(), v.OwnerUserID)
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "SetConversation success", respPb.String(), v.OwnerUserID)
}
}
for _, v := range utils.DifferenceString(haveConUserID, okUserIDList) {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, sessionType)
c.ConversationType = int32(sessionType)
c.GroupID = req.GroupID
c.IsNotInGroup = false
c.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
reqPb.Conversation = &c
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error(), v)
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "SetConversation success", respPb.String(), v)
}
}
if groupInfo.GroupType != constant.SuperGroup {
chat.MemberInvitedNotification(req.OperationID, req.GroupID, req.OpUserID, req.Reason, okUserIDList)
} else {
@ -881,21 +891,28 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G
log.NewError(req.OperationID, "GroupApplicationResponse failed ", err.Error(), member)
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
var sessionType int
if groupInfo.GroupType == constant.NormalGroup {
sessionType = constant.GroupChatType
} else {
sessionType = constant.SuperGroupChatType
}
var reqPb pbUser.SetConversationReq
reqPb.OperationID = req.OperationID
var c pbConversation.Conversation
conversation, err := imdb.GetConversation(req.FromUserID, utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType))
conversation, err := imdb.GetConversation(req.FromUserID, utils.GetConversationIDBySessionType(req.GroupID, sessionType))
if err != nil {
c.OwnerUserID = req.FromUserID
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType)
c.ConversationType = constant.GroupChatType
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, sessionType)
c.ConversationType = int32(sessionType)
c.GroupID = req.GroupID
c.IsNotInGroup = false
c.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
} else {
c.OwnerUserID = conversation.OwnerUserID
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType)
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, sessionType)
c.RecvMsgOpt = conversation.RecvMsgOpt
c.ConversationType = constant.GroupChatType
c.ConversationType = int32(sessionType)
c.GroupID = req.GroupID
c.IsPinned = conversation.IsPinned
c.AttachedInfo = conversation.AttachedInfo
@ -1012,6 +1029,37 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
log.NewError(req.OperationID, "InsertIntoGroupMember failed ", err.Error(), groupMember)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//}
var sessionType int
if groupInfo.GroupType == constant.NormalGroup {
sessionType = constant.GroupChatType
} else {
sessionType = constant.SuperGroupChatType
}
var reqPb pbUser.SetConversationReq
var c pbConversation.Conversation
reqPb.OperationID = req.OperationID
c.OwnerUserID = req.OpUserID
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, sessionType)
c.ConversationType = int32(sessionType)
c.GroupID = req.GroupID
c.IsNotInGroup = false
c.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
reqPb.Conversation = &c
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: errMsg}}, nil
}
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error())
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "SetConversation success", respPb.String())
}
chat.MemberEnterDirectlyNotification(req.GroupID, req.OpUserID, req.OperationID)
log.NewInfo(req.OperationID, "JoinGroup rpc return ")
@ -1035,7 +1083,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
//if err != nil {
// log.NewError(req.OperationID, "GetGroupMemberListByGroupIDAndRoleLevel failed ", err.Error(), req.GroupID, constant.GroupOwner)
// return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
//}
chat.JoinGroupApplicationNotification(req)
log.NewInfo(req.OperationID, "JoinGroup rpc return ")
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil

View File

@ -8,6 +8,7 @@ import (
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"github.com/go-redis/redis/v8"
)

View File

@ -0,0 +1 @@
package msg

View File

@ -10,11 +10,12 @@ import (
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"github.com/golang/protobuf/proto"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
@ -94,8 +95,12 @@ func (rpc *rpcChat) Run() {
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.Info("", "listen network success, address ", address)
var grpcOpts []grpc.ServerOption
recvSize := 1024 * 1024 * 30
sendSize := 1024 * 1024 * 30
var grpcOpts = []grpc.ServerOption{
grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize),
}
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()

View File

@ -96,12 +96,24 @@ func isMessageHasReadEnabled(pb *pbChat.SendMsgReq) (bool, int32, string) {
return true, 0, ""
}
func userIsMuteInGroup(groupID, userID string) (bool, error) {
func userIsMuteAndIsAdminInGroup(groupID, userID string) (isMute bool, isAdmin bool, err error) {
groupMemberInfo, err := rocksCache.GetGroupMemberInfoFromCache(groupID, userID)
if err != nil {
return false, utils.Wrap(err, "")
return false, false, utils.Wrap(err, "")
}
if groupMemberInfo.MuteEndTime.Unix() >= time.Now().Unix() {
return true, groupMemberInfo.RoleLevel > constant.GroupOrdinaryUsers, nil
}
return false, groupMemberInfo.RoleLevel > constant.GroupOrdinaryUsers, nil
}
func groupIsMuted(groupID string) (bool, error) {
groupInfo, err := rocksCache.GetGroupInfoFromCache(groupID)
if err != nil {
return false, utils.Wrap(err, "GetGroupInfoFromCache failed")
}
if groupInfo.Status == constant.GroupStatusMuted {
return true, nil
}
return false, nil
@ -182,7 +194,7 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
return false, 202, "you are not in group", nil
}
}
isMute, err := userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID)
isMute, isAdmin, err := userIsMuteAndIsAdminInGroup(data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
@ -190,6 +202,17 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
if isMute {
return false, 224, "you are muted", nil
}
if isAdmin {
return true, 0, "", userIDList
}
isMute, err = groupIsMuted(data.MsgData.GroupID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
}
if isMute {
return false, 225, "group id muted", nil
}
return true, 0, "", userIDList
case constant.SuperGroupChatType:
groupInfo, err := rocksCache.GetGroupInfoFromCache(data.MsgData.GroupID)
@ -245,7 +268,7 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
return false, 202, "you are not in group", nil
}
}
isMute, err := userIsMuteInGroup(data.MsgData.GroupID, data.MsgData.SendID)
isMute, isAdmin, err := userIsMuteAndIsAdminInGroup(data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
@ -253,6 +276,17 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
if isMute {
return false, 224, "you are muted", nil
}
if isAdmin {
return true, 0, "", userIDList
}
isMute, err = groupIsMuted(data.MsgData.GroupID)
if err != nil {
errMsg := data.OperationID + err.Error()
return false, 223, errMsg, nil
}
if isMute {
return false, 225, "group id muted", nil
}
return true, 0, "", userIDList
}
default:

View File

@ -7,6 +7,7 @@ import (
type CallbackBeforeCreateGroupReq struct {
CallbackCommand string `json:"callbackCommand"`
OperationID string `json:"operationID"`
commonPb.GroupInfo
InitMemberList []*group.GroupAddMemberInfo `json:"initMemberList"`
}
@ -30,6 +31,7 @@ type CallbackBeforeCreateGroupResp struct {
type CallbackBeforeMemberJoinGroupReq struct {
CallbackCommand string `json:"callbackCommand"`
OperationID string `json:"operationID"`
GroupID string `json:"groupID"`
UserID string `json:"userID"`
Ex string `json:"ex"`

View File

@ -2,8 +2,9 @@ package call_back_struct
type CallbackUserOnlineReq struct {
UserStatusCallbackReq
Token string `json:"token"`
Seq int `json:"seq"`
Token string `json:"token"`
Seq int `json:"seq"`
IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"`
}
type CallbackUserOnlineResp struct {
@ -12,7 +13,8 @@ type CallbackUserOnlineResp struct {
type CallbackUserOfflineReq struct {
UserStatusCallbackReq
Seq int `json:"seq"`
Seq int `json:"seq"`
IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"`
}
type CallbackUserOfflineResp struct {

View File

@ -30,6 +30,7 @@ type CallbackBeforeSuperGroupOnlinePushReq struct {
SessionType int32 `json:"sessionType"`
AtUserIDList []string `json:"atUserIDList"`
Content string `json:"content"`
Seq uint32 `json:"seq"`
}
type CallbackBeforeSuperGroupOnlinePushResp struct {

View File

@ -17,6 +17,17 @@ type AdminLoginResponse struct {
FaceURL string `json:"faceURL"`
}
type GetUserTokenRequest struct {
UserID string `json:"userID" binding:"required"`
OperationID string `json:"operationID" binding:"required"`
PlatFormID int32 `json:"platformID" binding:"required"`
}
type GetUserTokenResponse struct {
Token string `json:"token"`
ExpTime int64 `json:"expTime"`
}
type AddUserRegisterAddFriendIDListRequest struct {
OperationID string `json:"operationID" binding:"required"`
UserIDList []string `json:"userIDList" binding:"required"`

View File

@ -17,14 +17,15 @@ const (
RefuseFriendFlag = -1
//Websocket Protocol
WSGetNewestSeq = 1001
WSPullMsgBySeqList = 1002
WSSendMsg = 1003
WSSendSignalMsg = 1004
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WsLogoutMsg = 2003
WSDataError = 3001
WSGetNewestSeq = 1001
WSPullMsgBySeqList = 1002
WSSendMsg = 1003
WSSendSignalMsg = 1004
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WsLogoutMsg = 2003
WsSetBackgroundStatus = 2004
WSDataError = 3001
///ContentType
//UserRelated

View File

@ -33,6 +33,7 @@ var (
ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"}
ErrMessageHasReadDisable = ErrInfo{ErrCode: 811, ErrMsg: "message has read disable"}
ErrInternal = ErrInfo{ErrCode: 812, ErrMsg: "internal error"}
ErrWsConnNotExist = ErrInfo{ErrCode: 813, ErrMsg: "ws conn not exist"}
)
var (

View File

@ -0,0 +1,97 @@
package db
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/utils"
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson/primitive"
"strconv"
"time"
"go.mongodb.org/mongo-driver/bson"
)
const cExtendMsgSet = "extend_msg_set"
type ExtendMsgSet struct {
ID string `bson:"id" json:"ID"`
ExtendMsgs []*ExtendMsg `bson:"extend_msgs" json:"extendMsgs"`
LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"`
AttachedInfo *string `bson:"attached_info" json:"attachedInfo"`
Ex *string `bson:"ex" json:"ex"`
ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"`
CreateTime int32 `bson:"create_time" json:"createTime"`
}
type ReactionExtendMsgSet struct {
TypeKey string `bson:"type_key" json:"typeKey"`
Value string `bson:"value" json:"value"`
}
type ExtendMsg struct {
Content []*ReactionExtendMsgSet `bson:"content" json:"content"`
ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"`
CreateTime int32 `bson:"create_time" json:"createTime"`
}
func GetExtendMsgSetID(ID string, index int32) string {
return ID + ":" + strconv.Itoa(int(index))
}
func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.InsertOne(ctx, set)
return err
}
func (d *DataBases) GetAllExtendMsgSet(ID string) (sets []*ExtendMsgSet, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
regex := fmt.Sprintf("^%s", ID)
cursor, err := c.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: regex}})
if err != nil {
return nil, utils.Wrap(err, "")
}
err = cursor.All(context.Background(), &sets)
if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
return sets, nil
}
type GetExtendMsgSetOpts struct {
IncludeExtendMsgs bool
}
func (d *DataBases) GetExtendMsgSet(ID string, index int32, opts *GetExtendMsgSetOpts) (*ExtendMsgSet, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
var set ExtendMsgSet
err := c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}).Decode(&set)
return &set, err
}
func (d *DataBases) InsertExtendMsg(ID string, index int32, msg *ExtendMsg) (msgIndex int32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
result := c.FindOneAndUpdate(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"create_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, "$push": bson.M{"extend_msgs": msg}}})
set := &ExtendMsgSet{}
err = result.Decode(set)
return set.ExtendMsgNum, err
}
func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, msg *ExtendMsg, msgSet *ExtendMsgSet) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{})
return err
}
func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}).Decode(&extendMsgList)
return extendMsgList, err
}

View File

@ -291,13 +291,13 @@ func (d *DataBases) DelMongoMsgs(IDList []string) error {
return err
}
func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error {
func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
userChat := &UserChat{}
err := c.FindOne(ctx, bson.M{"uid": suffixID}).Decode(&userChat)
err = c.FindOne(ctx, bson.M{"uid": suffixID}).Decode(&userChat)
if err != nil {
return err
return 0, err
}
for i, msg := range userChat.Msg {
if i <= index {
@ -312,13 +312,14 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error {
}
msg.Msg = bytes
msg.SendTime = 0
replaceMaxSeq = msgPb.Seq
}
}
_, err = c.UpdateOne(ctx, bson.M{"uid": suffixID}, bson.M{"$set": bson.M{"msg": userChat.Msg}})
return err
return replaceMaxSeq, err
}
func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) {
func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
@ -334,13 +335,53 @@ func (d *DataBases) GetNewestMsg(ID string) (msg *MsgInfo, err error) {
}
if len(userChats) > 0 {
if len(userChats[0].Msg) > 0 {
return &userChats[0].Msg[len(userChats[0].Msg)-1], nil
msgPb := &open_im_sdk.MsgData{}
err = proto.Unmarshal(userChats[0].Msg[len(userChats[0].Msg)-1].Msg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
}
return msgPb, nil
}
return nil, errors.New("len(userChats[0].Msg) < 0")
}
return nil, nil
}
func (d *DataBases) GetOldestMsg(ID string) (msg *open_im_sdk.MsgData, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
regex := fmt.Sprintf("^%s", ID)
findOpts := options.Find().SetLimit(1).SetSort(bson.M{"uid": 1})
var userChats []UserChat
cursor, err := c.Find(ctx, bson.M{"uid": bson.M{"$regex": regex}}, findOpts)
if err != nil {
return nil, err
}
err = cursor.All(ctx, &userChats)
if err != nil {
return nil, utils.Wrap(err, "")
}
var oldestMsg []byte
if len(userChats) > 0 {
for _, v := range userChats[0].Msg {
if v.SendTime != 0 {
oldestMsg = v.Msg
break
}
}
if len(oldestMsg) == 0 {
oldestMsg = userChats[0].Msg[len(userChats[0].Msg)-1].Msg
}
msgPb := &open_im_sdk.MsgData{}
err = proto.Unmarshal(oldestMsg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
}
return msgPb, nil
}
return nil, nil
}
func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0

View File

@ -3,65 +3,42 @@ package im_mysql_model
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"fmt"
)
func GetChatLog(chatLog db.ChatLog, pageNumber, showNumber int32) ([]db.ChatLog, error) {
var chatLogs []db.ChatLog
db := db.DB.MysqlDB.DefaultGormDB().Table("chat_logs").
Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1)))
func GetChatLog(chatLog *db.ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []db.ChatLog, error) {
mdb := db.DB.MysqlDB.DefaultGormDB().Table("chat_logs")
if chatLog.SendTime.Unix() > 0 {
db = db.Where("send_time > ? and send_time < ?", chatLog.SendTime, chatLog.SendTime.AddDate(0, 0, 1))
mdb = mdb.Where("send_time > ? and send_time < ?", chatLog.SendTime, chatLog.SendTime.AddDate(0, 0, 1))
}
if chatLog.Content != "" {
db = db.Where(" content like ? ", fmt.Sprintf("%%%s%%", chatLog.Content))
mdb = mdb.Where(" content like ? ", fmt.Sprintf("%%%s%%", chatLog.Content))
}
if chatLog.SessionType == 1 {
db = db.Where("session_type = ?", chatLog.SessionType)
mdb = mdb.Where("session_type = ?", chatLog.SessionType)
} else if chatLog.SessionType == 2 {
db = db.Where("session_type in (?)", []int{constant.GroupChatType, constant.SuperGroupChatType})
mdb = mdb.Where("session_type in (?)", []int{constant.GroupChatType, constant.SuperGroupChatType})
}
if chatLog.ContentType != 0 {
db = db.Where("content_type = ?", chatLog.ContentType)
mdb = mdb.Where("content_type = ?", chatLog.ContentType)
}
if chatLog.SendID != "" {
db = db.Where("send_id = ?", chatLog.SendID)
mdb = mdb.Where("send_id = ?", chatLog.SendID)
}
if chatLog.RecvID != "" {
db = db.Where("recv_id = ?", chatLog.RecvID)
mdb = mdb.Where("recv_id = ?", chatLog.RecvID)
}
if len(contentTypeList) > 0 {
mdb = mdb.Where("content_type in (?)", contentTypeList)
}
err := db.Find(&chatLogs).Error
return chatLogs, err
}
func GetChatLogCount(chatLog db.ChatLog) (int64, error) {
var count int64
db := db.DB.MysqlDB.DefaultGormDB().Table("chat_logs")
if chatLog.SendTime.Unix() > 0 {
log.NewDebug("", utils.GetSelfFuncName(), chatLog.SendTime, chatLog.SendTime.AddDate(0, 0, 1))
db = db.Where("send_time > ? and send_time < ?", chatLog.SendTime, chatLog.SendTime.AddDate(0, 0, 1))
if err := mdb.Count(&count).Error; err != nil {
return 0, nil, err
}
if chatLog.Content != "" {
db = db.Where(" content like ? ", fmt.Sprintf("%%%s%%", chatLog.Content))
var chatLogs []db.ChatLog
mdb = mdb.Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1)))
if err := mdb.Find(&chatLogs).Error; err != nil {
return 0, nil, err
}
if chatLog.SessionType == 1 {
db = db.Where("session_type = ?", chatLog.SessionType)
} else if chatLog.SessionType == 2 {
db = db.Where("session_type in (?)", []int{constant.GroupChatType, constant.SuperGroupChatType})
}
if chatLog.ContentType != 0 {
db = db.Where("content_type = ?", chatLog.ContentType)
}
if chatLog.SendID != "" {
db = db.Where("send_id = ?", chatLog.SendID)
}
if chatLog.RecvID != "" {
db = db.Where("recv_id = ?", chatLog.RecvID)
}
err := db.Count(&count).Error
return count, err
return count, chatLogs, nil
}

View File

@ -8,7 +8,6 @@ import (
"Open_IM/pkg/utils"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"sort"
@ -34,6 +33,8 @@ const (
groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:"
conversationCache = "CONVERSATION_CACHE:"
conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:"
extendMsgSetCache = "EXTEND_MSG_SET_CACHE:"
extendMsgCache = "EXTEND_MSG_CACHE:"
)
func DelKeys() {
@ -408,9 +409,6 @@ func GetJoinedSuperGroupListFromCache(userID string) ([]string, error) {
if err != nil {
return "", utils.Wrap(err, "")
}
if len(userToSuperGroup.GroupIDList) == 0 {
return "", errors.New("GroupIDList == 0")
}
bytes, err := json.Marshal(userToSuperGroup.GroupIDList)
if err != nil {
return "", utils.Wrap(err, "")
@ -568,3 +566,63 @@ func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) {
func DelConversationFromCache(ownerUserID, conversationID string) error {
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
}
func GetExtendMsgSetFromCache(ID string, index int32) (*db.ExtendMsgSet, error) {
getExtendMsgSet := func() (string, error) {
extendMsgSet, err := db.DB.GetExtendMsgSet(ID, index, &db.GetExtendMsgSetOpts{IncludeExtendMsgs: false})
if err != nil {
return "", utils.Wrap(err, "GetExtendMsgSet failed")
}
bytes, err := json.Marshal(extendMsgSet)
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
extendMsgSetStr, err := db.DB.Rc.Fetch(extendMsgSetCache+db.GetExtendMsgSetID(ID, index), time.Second*30*60, getExtendMsgSet)
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
extendMsgSet := &db.ExtendMsgSet{}
err = json.Unmarshal([]byte(extendMsgSetStr), extendMsgSet)
if err != nil {
return nil, utils.Wrap(err, "Unmarshal failed")
}
return extendMsgSet, nil
}
func DelExtendMsgSetFromCache(ID string, index int32) error {
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgSetCache+db.GetExtendMsgSetID(ID, index)), "DelExtendMsgSetFromCache err")
}
func GetExtendMsg(ID string, index, extendMsgIndex int32) (*db.ExtendMsg, error) {
getExtendMsg := func() (string, error) {
extendMsg, err := db.DB.GetExtendMsgList(ID, index, extendMsgIndex, extendMsgIndex+1)
if err != nil {
return "", utils.Wrap(err, "GetExtendMsgList failed")
}
if len(extendMsg) == 0 {
return "", nil
}
bytes, err := json.Marshal(extendMsg[0])
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex)), time.Second*30*60, getExtendMsg)
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
extendMsg := &db.ExtendMsg{}
err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
if err != nil {
return nil, utils.Wrap(err, "Unmarshal failed")
}
return extendMsg, nil
}
func DelExtendMsg(ID string, index, extendMsgIndex int32) error {
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex))), "DelExtendMsg err")
}

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,18 @@ message AdminLoginResp {
CommonResp commonResp = 4;
}
message GetUserTokenReq {
string operationID = 1;
string userID = 2;
int32 platformID = 3;
}
message GetUserTokenResp {
CommonResp commonResp = 1;
string token = 2;
int64 expTime = 3;
}
message AddUserRegisterAddFriendIDListReq {
string operationID = 1;
repeated string userIDList = 2;
@ -328,6 +340,7 @@ message GetUserIDByEmailAndPhoneNumberResp{
service adminCMS {
rpc AdminLogin(AdminLoginReq) returns(AdminLoginResp);
rpc AddUserRegisterAddFriendIDList(AddUserRegisterAddFriendIDListReq) returns(AddUserRegisterAddFriendIDListResp);
rpc ReduceUserRegisterAddFriendIDList(ReduceUserRegisterAddFriendIDListReq) returns(ReduceUserRegisterAddFriendIDListResp);
rpc GetUserRegisterAddFriendIDList(GetUserRegisterAddFriendIDListReq) returns(GetUserRegisterAddFriendIDListResp);
@ -357,4 +370,6 @@ service adminCMS {
rpc GetUserFriends(GetUserFriendsReq) returns(GetUserFriendsResp);
rpc GetUserIDByEmailAndPhoneNumber(GetUserIDByEmailAndPhoneNumberReq) returns(GetUserIDByEmailAndPhoneNumberResp);
rpc GetUserToken(GetUserTokenReq) returns(GetUserTokenResp);
}

View File

@ -166,5 +166,4 @@ service msg {
rpc GetSendMsgStatus(GetSendMsgStatusReq) returns(GetSendMsgStatusResp);
rpc GetSuperGroupMsg(GetSuperGroupMsgReq) returns(GetSuperGroupMsgResp);
rpc GetWriteDiffMsg(GetWriteDiffMsgReq) returns(GetWriteDiffMsgResp);
}

File diff suppressed because it is too large Load Diff

View File

@ -89,7 +89,6 @@ message MultiTerminalLoginCheckResp{
string errMsg = 2;
}
service relay {
rpc OnlinePushMsg(OnlinePushMsgReq) returns(OnlinePushMsgResp);
rpc GetUsersOnlineStatus(GetUsersOnlineStatusReq) returns(GetUsersOnlineStatusResp);

File diff suppressed because it is too large Load Diff

View File

@ -696,4 +696,12 @@ message DelMsgListResp{
string errMsg = 2;
}
message SetAppBackgroundStatusReq {
string userID = 1;
bool isBackground = 2;
}
message SetAppBackgroundStatusResp {
int32 errCode = 1;
string errMsg = 2;
}

View File

@ -96,6 +96,8 @@ func GetConversationIDBySessionType(sourceID string, sessionType int) string {
return "single_" + sourceID
case constant.GroupChatType:
return "group_" + sourceID
case constant.SuperGroupChatType:
return "super_group_" + sourceID
case constant.NotificationChatType:
return "notification_" + sourceID
}