ws connID

This commit is contained in:
wangchuxiao 2022-12-05 18:29:58 +08:00
parent 75629ba5ea
commit 837efc90b6
5 changed files with 26 additions and 13 deletions

View File

@ -9,7 +9,7 @@ import (
"time" "time"
) )
func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackgroundStatusChanged bool) cbApi.CommonCallbackResp { func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackgroundStatusChanged bool, connID string) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserOnline.Enable { if !config.Config.Callback.CallbackUserOnline.Enable {
return callbackResp return callbackResp
@ -27,6 +27,7 @@ func callbackUserOnline(operationID, userID string, platformID int, token string
}, },
Seq: int(time.Now().UnixNano() / 1e6), Seq: int(time.Now().UnixNano() / 1e6),
IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged, IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged,
ConnID: connID,
} }
callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: &callbackResp} callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOnlineCommand, callbackUserOnlineReq, callbackUserOnlineResp, config.Config.Callback.CallbackUserOnline.CallbackTimeOut); err != nil { if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOnlineCommand, callbackUserOnlineReq, callbackUserOnlineResp, config.Config.Callback.CallbackUserOnline.CallbackTimeOut); err != nil {
@ -36,7 +37,7 @@ func callbackUserOnline(operationID, userID string, platformID int, token string
return callbackResp return callbackResp
} }
func callbackUserOffline(operationID, userID string, platformID int, isAppBackgroundStatusChanged bool) cbApi.CommonCallbackResp { func callbackUserOffline(operationID, userID string, platformID int, isAppBackgroundStatusChanged bool, connID string) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
if !config.Config.Callback.CallbackUserOffline.Enable { if !config.Config.Callback.CallbackUserOffline.Enable {
return callbackResp return callbackResp
@ -53,6 +54,7 @@ func callbackUserOffline(operationID, userID string, platformID int, isAppBackgr
}, },
Seq: int(time.Now().UnixNano() / 1e6), Seq: int(time.Now().UnixNano() / 1e6),
IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged, IsAppBackgroundStatusChanged: isAppBackgroundStatusChanged,
ConnID: connID,
} }
callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: &callbackResp} callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: &callbackResp}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOfflineCommand, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil { if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackUserOfflineCommand, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil {

View File

@ -407,12 +407,12 @@ func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
req := pData.(*sdk_ws.SetAppBackgroundStatusReq) req := pData.(*sdk_ws.SetAppBackgroundStatusReq)
conn.IsBackground = req.IsBackground conn.IsBackground = req.IsBackground
if !conn.IsBackground { if !conn.IsBackground {
callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, true) callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, true, conn.connID)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
} }
} else { } else {
callbackResp := callbackUserOffline(m.OperationID, conn.userID, int(conn.PlatformID), true) callbackResp := callbackUserOffline(m.OperationID, conn.userID, int(conn.PlatformID), true, conn.connID)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
} }

View File

@ -15,6 +15,7 @@ import (
"context" "context"
"encoding/gob" "encoding/gob"
"io/ioutil" "io/ioutil"
"strconv"
"strings" "strings"
go_redis "github.com/go-redis/redis/v8" go_redis "github.com/go-redis/redis/v8"
@ -37,6 +38,7 @@ type UserConn struct {
userID string userID string
IsBackground bool IsBackground bool
token string token string
connID string
} }
type WServer struct { type WServer struct {
@ -82,9 +84,9 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
log.Error(operationID, "upgrade http conn err", err.Error(), query) log.Error(operationID, "upgrade http conn err", err.Error(), query)
return return
} else { } else {
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0]} newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0], conn.RemoteAddr().String() + strconv.Itoa(int(utils.GetCurrentTimestampBySecond()))}
userCount++ userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID) ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], newConn.connID, operationID)
go ws.readMsg(newConn) go ws.readMsg(newConn)
} }
} else { } else {
@ -319,11 +321,11 @@ func (ws *WServer) sendKickMsg(oldConn *UserConn) {
} }
} }
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) { func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) {
rwLock.Lock() rwLock.Lock()
defer rwLock.Unlock() defer rwLock.Unlock()
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String()) log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String())
callbackResp := callbackUserOnline(operationID, uid, platformID, token, false) callbackResp := callbackUserOnline(operationID, uid, platformID, token, false, connID)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp) log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)
} }
@ -386,7 +388,7 @@ func (ws *WServer) delUserConn(conn *UserConn) {
if err != nil { if err != nil {
log.Error(operationID, " close err", "", "uid", uid, "platform", platform) log.Error(operationID, " close err", "", "uid", uid, "platform", platform)
} }
callbackResp := callbackUserOffline(operationID, conn.userID, platform, false) callbackResp := callbackUserOffline(operationID, conn.userID, platform, false, conn.connID)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
} }

View File

@ -5,6 +5,7 @@ type CallbackUserOnlineReq struct {
Token string `json:"token"` Token string `json:"token"`
Seq int `json:"seq"` Seq int `json:"seq"`
IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"` IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"`
ConnID string `json:"connID"`
} }
type CallbackUserOnlineResp struct { type CallbackUserOnlineResp struct {
@ -13,8 +14,9 @@ type CallbackUserOnlineResp struct {
type CallbackUserOfflineReq struct { type CallbackUserOfflineReq struct {
UserStatusCallbackReq UserStatusCallbackReq
Seq int `json:"seq"` Seq int `json:"seq"`
IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"` IsAppBackgroundStatusChanged bool `json:"isAppBackgroundStatusChanged"`
ConnID string `json:"connID"`
} }
type CallbackUserOfflineResp struct { type CallbackUserOfflineResp struct {

View File

@ -25,7 +25,7 @@ type ExtendMsgSet struct {
} }
type ReactionExtendMsgSet struct { type ReactionExtendMsgSet struct {
TypeKey string `bson:"type_key" json:"typeKey"` UserKey string `bson:"user_key" json:"userKey"`
Value string `bson:"value" json:"value"` Value string `bson:"value" json:"value"`
} }
@ -82,13 +82,20 @@ func (d *DataBases) InsertExtendMsg(ID string, index int32, msg *ExtendMsg) (msg
return set.ExtendMsgNum, err return set.ExtendMsgNum, err
} }
func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, msg *ExtendMsg, msgSet *ExtendMsgSet) error { func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, userIndex string, value string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{}) _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{})
return err return err
} }
func (d *DataBases) DelOneExtendMsgSetUserKey(ID string, index, MsgIndex int32, userIndex string, userID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.DeleteOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)})
return err
}
func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) { func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)