mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge branch 'tuoyun'
This commit is contained in:
commit
b2a9867297
@ -1 +1 @@
|
|||||||
Subproject commit 992f76df0ee500a0377523b0780d3a85f2275755
|
Subproject commit 1c6c7af5393b3e9eefbaf16b673519ca863a6c2c
|
@ -102,7 +102,7 @@ func main() {
|
|||||||
conversationGroup.POST("/get_receive_message_opt", conversation.GetReceiveMessageOpt) //1
|
conversationGroup.POST("/get_receive_message_opt", conversation.GetReceiveMessageOpt) //1
|
||||||
conversationGroup.POST("/get_all_conversation_message_opt", conversation.GetAllConversationMessageOpt) //1
|
conversationGroup.POST("/get_all_conversation_message_opt", conversation.GetAllConversationMessageOpt) //1
|
||||||
}
|
}
|
||||||
|
apiThird.MinioInit()
|
||||||
log.NewPrivateLog("api")
|
log.NewPrivateLog("api")
|
||||||
ginPort := flag.Int("port", 10000, "get ginServerPort from cmd,default 10000 as port")
|
ginPort := flag.Int("port", 10000, "get ginServerPort from cmd,default 10000 as port")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
@ -91,8 +91,8 @@ credential: #腾讯cos,发送图片、视频、文件时需要,请自行申
|
|||||||
bucket: openim
|
bucket: openim
|
||||||
location: us-east-1
|
location: us-east-1
|
||||||
endpoint: http://127.0.0.1:9000
|
endpoint: http://127.0.0.1:9000
|
||||||
accessKeyID: minioadmin
|
accessKeyID: user12345
|
||||||
secretAccessKey: minioadmin
|
secretAccessKey: key12345
|
||||||
|
|
||||||
|
|
||||||
rpcport: #rpc服务端口 默认即可
|
rpcport: #rpc服务端口 默认即可
|
||||||
@ -131,8 +131,9 @@ rpcregistername: #rpc注册服务名,默认即可
|
|||||||
log:
|
log:
|
||||||
storageLocation: ../logs/
|
storageLocation: ../logs/
|
||||||
rotationTime: 24
|
rotationTime: 24
|
||||||
remainRotationCount: 5 #日志数量
|
remainRotationCount: 3 #日志数量
|
||||||
remainLogLevel: 6 #日志级别 6表示全都打印,测试阶段建议设置为6
|
#日志级别 6表示全都打印,测试阶段建议设置为6
|
||||||
|
remainLogLevel: 6
|
||||||
elasticSearchSwitch: false
|
elasticSearchSwitch: false
|
||||||
elasticSearchAddr: [ 127.0.0.1:9201 ]
|
elasticSearchAddr: [ 127.0.0.1:9201 ]
|
||||||
elasticSearchUser: ""
|
elasticSearchUser: ""
|
||||||
@ -178,13 +179,6 @@ tokenpolicy:
|
|||||||
# Token effective time day as a unit
|
# Token effective time day as a unit
|
||||||
accessExpire: 3650 #token过期时间(天) 默认即可
|
accessExpire: 3650 #token过期时间(天) 默认即可
|
||||||
|
|
||||||
messagecallback:
|
|
||||||
callbackSwitch: false
|
|
||||||
callbackUrl: "http://www.xxx.com/msg/judge"
|
|
||||||
#TimeOut use second as unit
|
|
||||||
callbackTimeOut: 10
|
|
||||||
messagejudge:
|
|
||||||
isJudgeFriend: true
|
|
||||||
# c2c:
|
# c2c:
|
||||||
# callbackBeforeSendMsg:
|
# callbackBeforeSendMsg:
|
||||||
# switch: false
|
# switch: false
|
||||||
@ -199,6 +193,30 @@ iospush:
|
|||||||
pushSound: "xxx"
|
pushSound: "xxx"
|
||||||
badgeCount: true
|
badgeCount: true
|
||||||
|
|
||||||
|
callback:
|
||||||
|
# callback url 需要自行更换callback url
|
||||||
|
callbackUrl : "http://127.0.0.1:8080/callback"
|
||||||
|
# 开启关闭操作前后回调的配置
|
||||||
|
callbackbeforeSendSingleMsg:
|
||||||
|
enable: false # 回调是否启用
|
||||||
|
callbackTimeOut: 2 # 回调超时时间
|
||||||
|
CallbackFailedContinue: true # 回调超时是否继续执行代码
|
||||||
|
callbackAfterSendSingleMsg:
|
||||||
|
enable: false
|
||||||
|
callbackTimeOut: 2
|
||||||
|
callbackBeforeSendGroupMsg:
|
||||||
|
enable: false
|
||||||
|
callbackTimeOut: 2
|
||||||
|
CallbackFailedContinue: true
|
||||||
|
callbackAfterSendGroupMsg:
|
||||||
|
enable: false
|
||||||
|
callbackTimeOut: 2
|
||||||
|
callbackWordFilter:
|
||||||
|
enable: false
|
||||||
|
callbackTimeOut: 2
|
||||||
|
CallbackFailedContinue: true
|
||||||
|
|
||||||
|
|
||||||
notification:
|
notification:
|
||||||
groupCreated:
|
groupCreated:
|
||||||
conversation:
|
conversation:
|
||||||
|
@ -24,6 +24,10 @@ services:
|
|||||||
- ./components/mongodb/data/db:/data/db
|
- ./components/mongodb/data/db:/data/db
|
||||||
- ./components/mongodb/data/logs:/data/logs
|
- ./components/mongodb/data/logs:/data/logs
|
||||||
- ./components/mongodb/data/conf:/etc/mongo
|
- ./components/mongodb/data/conf:/etc/mongo
|
||||||
|
environment:
|
||||||
|
- TZ=Asia/Shanghai
|
||||||
|
# cache
|
||||||
|
- wiredTigerCacheSizeGB=1
|
||||||
# environment:
|
# environment:
|
||||||
# - MONGO_INITDB_ROOT_USERNAME=openIM
|
# - MONGO_INITDB_ROOT_USERNAME=openIM
|
||||||
# - MONGO_INITDB_ROOT_PASSWORD=openIM
|
# - MONGO_INITDB_ROOT_PASSWORD=openIM
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
url2 "net/url"
|
url2 "net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func MinioInit() {
|
||||||
minioUrl, err := url2.Parse(config.Config.Credential.Minio.Endpoint)
|
minioUrl, err := url2.Parse(config.Config.Credential.Minio.Endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError("", utils.GetSelfFuncName(), "parse failed, please check config/config.yaml", err.Error())
|
log.NewError("", utils.GetSelfFuncName(), "parse failed, please check config/config.yaml", err.Error())
|
||||||
@ -30,18 +30,23 @@ func init() {
|
|||||||
}
|
}
|
||||||
err = minioClient.MakeBucket(context.Background(), config.Config.Credential.Minio.Bucket, opt)
|
err = minioClient.MakeBucket(context.Background(), config.Config.Credential.Minio.Bucket, opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.NewInfo("", utils.GetSelfFuncName(), err.Error())
|
||||||
exists, err := minioClient.BucketExists(context.Background(), config.Config.Credential.Minio.Bucket)
|
exists, err := minioClient.BucketExists(context.Background(), config.Config.Credential.Minio.Bucket)
|
||||||
if err == nil && exists {
|
if err == nil && exists {
|
||||||
log.NewInfo("", utils.GetSelfFuncName(), "We already own %s\n", config.Config.Credential.Minio.Bucket)
|
log.NewInfo("", utils.GetSelfFuncName(), "We already own %s\n", config.Config.Credential.Minio.Bucket)
|
||||||
} else {
|
} else {
|
||||||
log.NewError("", utils.GetSelfFuncName(), "create bucket failed and bucket not exists", err.Error())
|
if err != nil {
|
||||||
|
log.NewError("", utils.GetSelfFuncName(), err.Error())
|
||||||
|
}
|
||||||
|
log.NewError("", utils.GetSelfFuncName(), "create bucket failed and bucket not exists")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 自动化桶public的代码
|
||||||
//err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite)
|
//err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite)
|
||||||
//if err != nil {
|
//if err != nil {
|
||||||
// log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in ", err.Error())
|
// log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in ", err.Error())
|
||||||
// return
|
// return`z
|
||||||
//}
|
//}
|
||||||
log.NewInfo("", utils.GetSelfFuncName(), "minio create and set policy success")
|
log.NewInfo("", utils.GetSelfFuncName(), "minio create and set policy success")
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
|
"Open_IM/pkg/common/token_verify"
|
||||||
_ "Open_IM/pkg/common/token_verify"
|
_ "Open_IM/pkg/common/token_verify"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@ -23,12 +24,12 @@ func MinioStorageCredential(c *gin.Context) {
|
|||||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//ok, _ := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"))
|
ok, _ := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"))
|
||||||
//if !ok {
|
if !ok {
|
||||||
// log.NewError("", utils.GetSelfFuncName(), "GetUserIDFromToken false ", c.Request.Header.Get("token"))
|
log.NewError("", utils.GetSelfFuncName(), "GetUserIDFromToken false ", c.Request.Header.Get("token"))
|
||||||
// c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"})
|
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"})
|
||||||
// return
|
return
|
||||||
//}
|
}
|
||||||
var stsOpts cr.STSAssumeRoleOptions
|
var stsOpts cr.STSAssumeRoleOptions
|
||||||
stsOpts.AccessKey = config.Config.Credential.Minio.AccessKeyID
|
stsOpts.AccessKey = config.Config.Credential.Minio.AccessKeyID
|
||||||
stsOpts.SecretKey = config.Config.Credential.Minio.SecretAccessKey
|
stsOpts.SecretKey = config.Config.Credential.Minio.SecretAccessKey
|
||||||
@ -45,11 +46,6 @@ func MinioStorageCredential(c *gin.Context) {
|
|||||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
log.NewError("0", utils.GetSelfFuncName(), err.Error())
|
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resp.SessionToken = v.SessionToken
|
resp.SessionToken = v.SessionToken
|
||||||
resp.SecretAccessKey = v.SecretAccessKey
|
resp.SecretAccessKey = v.SecretAccessKey
|
||||||
resp.AccessKeyID = v.AccessKeyID
|
resp.AccessKeyID = v.AccessKeyID
|
||||||
|
@ -53,7 +53,7 @@ func Login(c *gin.Context) {
|
|||||||
openIMGetUserToken.Secret = config.Config.Secret
|
openIMGetUserToken.Secret = config.Config.Secret
|
||||||
openIMGetUserToken.UserID = account
|
openIMGetUserToken.UserID = account
|
||||||
openIMGetUserTokenResp := api.UserTokenResp{}
|
openIMGetUserTokenResp := api.UserTokenResp{}
|
||||||
bMsg, err := http2.Post(url, openIMGetUserToken, config.Config.MessageCallBack.CallBackTimeOut)
|
bMsg, err := http2.Post(url, openIMGetUserToken, 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(params.OperationID, "request openIM get user token error", account, "err", err.Error())
|
log.NewError(params.OperationID, "request openIM get user token error", account, "err", err.Error())
|
||||||
c.JSON(http.StatusOK, gin.H{"errCode": constant.GetIMTokenErr, "errMsg": err.Error()})
|
c.JSON(http.StatusOK, gin.H{"errCode": constant.GetIMTokenErr, "errMsg": err.Error()})
|
||||||
|
@ -12,10 +12,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type resetPasswordRequest struct {
|
type resetPasswordRequest struct {
|
||||||
VerificationCode string `json:"verificationCode"`
|
VerificationCode string `json:"verificationCode" binding:"required"`
|
||||||
Email string `json:"email"`
|
Email string `json:"email"`
|
||||||
PhoneNumber string `json:"phoneNumber"`
|
PhoneNumber string `json:"phoneNumber"`
|
||||||
NewPassword string `json:"newPassword"`
|
NewPassword string `json:"newPassword" binding:"required"`
|
||||||
OperationID string `json:"operationID"`
|
OperationID string `json:"operationID"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ func SetPassword(c *gin.Context) {
|
|||||||
openIMRegisterReq.Nickname = account
|
openIMRegisterReq.Nickname = account
|
||||||
openIMRegisterReq.Secret = config.Config.Secret
|
openIMRegisterReq.Secret = config.Config.Secret
|
||||||
openIMRegisterResp := api.UserRegisterResp{}
|
openIMRegisterResp := api.UserRegisterResp{}
|
||||||
bMsg, err := http2.Post(url, openIMRegisterReq, config.Config.MessageCallBack.CallBackTimeOut)
|
bMsg, err := http2.Post(url, openIMRegisterReq, 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(params.OperationID, "request openIM register error", account, "err", err.Error())
|
log.NewError(params.OperationID, "request openIM register error", account, "err", err.Error())
|
||||||
c.JSON(http.StatusOK, gin.H{"errCode": constant.RegisterFailed, "errMsg": err.Error()})
|
c.JSON(http.StatusOK, gin.H{"errCode": constant.RegisterFailed, "errMsg": err.Error()})
|
||||||
|
@ -3,6 +3,8 @@ package gate
|
|||||||
import (
|
import (
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
|
"Open_IM/pkg/statistics"
|
||||||
|
"fmt"
|
||||||
"github.com/go-playground/validator/v10"
|
"github.com/go-playground/validator/v10"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@ -12,6 +14,8 @@ var (
|
|||||||
validate *validator.Validate
|
validate *validator.Validate
|
||||||
ws WServer
|
ws WServer
|
||||||
rpcSvr RPCServer
|
rpcSvr RPCServer
|
||||||
|
sendMsgCount uint64
|
||||||
|
userCount uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
func Init(rpcPort, wsPort int) {
|
func Init(rpcPort, wsPort int) {
|
||||||
@ -19,6 +23,8 @@ func Init(rpcPort, wsPort int) {
|
|||||||
log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName)
|
log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName)
|
||||||
rwLock = new(sync.RWMutex)
|
rwLock = new(sync.RWMutex)
|
||||||
validate = validator.New()
|
validate = validator.New()
|
||||||
|
statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 10)
|
||||||
|
statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 10)
|
||||||
ws.onInit(wsPort)
|
ws.onInit(wsPort)
|
||||||
rpcSvr.onInit(rpcPort)
|
rpcSvr.onInit(rpcPort)
|
||||||
}
|
}
|
||||||
|
@ -142,6 +142,7 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM
|
|||||||
|
|
||||||
}
|
}
|
||||||
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
||||||
|
sendMsgCount++
|
||||||
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID)
|
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID)
|
||||||
nReply := new(pbChat.SendMsgResp)
|
nReply := new(pbChat.SendMsgResp)
|
||||||
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
|
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
|
||||||
|
@ -183,7 +183,8 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok
|
|||||||
for _, v := range ws.wsUserToConn {
|
for _, v := range ws.wsUserToConn {
|
||||||
count = count + len(v)
|
count = count + len(v)
|
||||||
}
|
}
|
||||||
log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
|
log.Debug("WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
|
||||||
|
userCount = uint64(len(ws.wsUserToConn))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,6 +211,7 @@ func (ws *WServer) delUserConn(conn *UserConn) {
|
|||||||
} else {
|
} else {
|
||||||
log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
|
log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
|
||||||
}
|
}
|
||||||
|
userCount = uint64(len(ws.wsUserToConn))
|
||||||
delete(ws.wsConnToUser, conn)
|
delete(ws.wsConnToUser, conn)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||||
pbMsg "Open_IM/pkg/proto/chat"
|
pbMsg "Open_IM/pkg/proto/chat"
|
||||||
pbPush "Open_IM/pkg/proto/push"
|
pbPush "Open_IM/pkg/proto/push"
|
||||||
|
"Open_IM/pkg/statistics"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
@ -20,9 +21,14 @@ type fcb func(msg []byte, msgKey string)
|
|||||||
type HistoryConsumerHandler struct {
|
type HistoryConsumerHandler struct {
|
||||||
msgHandle map[string]fcb
|
msgHandle map[string]fcb
|
||||||
historyConsumerGroup *kfk.MConsumerGroup
|
historyConsumerGroup *kfk.MConsumerGroup
|
||||||
|
singleMsgCount uint64
|
||||||
|
groupMsgCount uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *HistoryConsumerHandler) Init() {
|
func (mc *HistoryConsumerHandler) Init() {
|
||||||
|
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10)
|
||||||
|
statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10)
|
||||||
|
|
||||||
mc.msgHandle = make(map[string]fcb)
|
mc.msgHandle = make(map[string]fcb)
|
||||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
||||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||||
@ -55,6 +61,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
|||||||
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
mc.singleMsgCount++
|
||||||
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
|
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
|
||||||
}
|
}
|
||||||
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
||||||
@ -70,6 +77,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
|||||||
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
mc.groupMsgCount++
|
||||||
}
|
}
|
||||||
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
||||||
default:
|
default:
|
||||||
|
@ -11,6 +11,8 @@ import (
|
|||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/kafka"
|
"Open_IM/pkg/common/kafka"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
|
"Open_IM/pkg/statistics"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -18,6 +20,7 @@ var (
|
|||||||
pushCh PushConsumerHandler
|
pushCh PushConsumerHandler
|
||||||
pushTerminal []int32
|
pushTerminal []int32
|
||||||
producer *kafka.Producer
|
producer *kafka.Producer
|
||||||
|
count uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
func Init(rpcPort int) {
|
func Init(rpcPort int) {
|
||||||
@ -28,6 +31,7 @@ func Init(rpcPort int) {
|
|||||||
}
|
}
|
||||||
func init() {
|
func init() {
|
||||||
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
|
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
|
||||||
|
statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 10), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Run() {
|
func Run() {
|
||||||
|
@ -35,10 +35,10 @@ type AtContent struct {
|
|||||||
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||||
var wsResult []*pbRelay.SingleMsgToUser
|
var wsResult []*pbRelay.SingleMsgToUser
|
||||||
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
|
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
|
||||||
log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String())
|
log.Debug("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String())
|
||||||
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
|
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
|
||||||
//Online push message
|
//Online push message
|
||||||
log.InfoByKv("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
|
log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
|
||||||
for _, v := range grpcCons {
|
for _, v := range grpcCons {
|
||||||
msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v)
|
msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v)
|
||||||
reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID})
|
reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID})
|
||||||
@ -51,6 +51,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData)
|
log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData)
|
||||||
|
count++
|
||||||
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
|
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
|
||||||
for _, v := range wsResult {
|
for _, v := range wsResult {
|
||||||
if v.ResultCode == 0 {
|
if v.ResultCode == 0 {
|
||||||
|
1
internal/rpc/auth/callback.go
Normal file
1
internal/rpc/auth/callback.go
Normal file
@ -0,0 +1 @@
|
|||||||
|
package auth
|
1
internal/rpc/friend/callback.go
Normal file
1
internal/rpc/friend/callback.go
Normal file
@ -0,0 +1 @@
|
|||||||
|
package friend
|
9
internal/rpc/group/callback.go
Normal file
9
internal/rpc/group/callback.go
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
package group
|
||||||
|
|
||||||
|
import (
|
||||||
|
pbGroup "Open_IM/pkg/proto/group"
|
||||||
|
)
|
||||||
|
|
||||||
|
func callbackBeforeCreateGroup(req *pbGroup.CreateGroupReq) (bool, error) {
|
||||||
|
return true, nil
|
||||||
|
}
|
@ -76,7 +76,12 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
|||||||
log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.OwnerUserID)
|
log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.OwnerUserID)
|
||||||
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
|
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
|
||||||
}
|
}
|
||||||
|
canCreate, err := callbackBeforeCreateGroup(req)
|
||||||
|
if err != nil || !canCreate {
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeCreateGroup failed", )
|
||||||
|
}
|
||||||
|
}
|
||||||
//Time stamp + MD5 to generate group chat id
|
//Time stamp + MD5 to generate group chat id
|
||||||
groupId := utils.Md5(strconv.FormatInt(time.Now().UnixNano(), 10))
|
groupId := utils.Md5(strconv.FormatInt(time.Now().UnixNano(), 10))
|
||||||
//to group
|
//to group
|
||||||
@ -84,7 +89,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
|||||||
utils.CopyStructFields(&groupInfo, req.GroupInfo)
|
utils.CopyStructFields(&groupInfo, req.GroupInfo)
|
||||||
groupInfo.CreatorUserID = req.OpUserID
|
groupInfo.CreatorUserID = req.OpUserID
|
||||||
groupInfo.GroupID = groupId
|
groupInfo.GroupID = groupId
|
||||||
err := imdb.InsertIntoGroup(groupInfo)
|
err = imdb.InsertIntoGroup(groupInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(req.OperationID, "InsertIntoGroup failed, ", err.Error(), groupInfo)
|
log.NewError(req.OperationID, "InsertIntoGroup failed, ", err.Error(), groupInfo)
|
||||||
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB)
|
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB)
|
||||||
|
159
internal/rpc/msg/callback.go
Normal file
159
internal/rpc/msg/callback.go
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
package msg
|
||||||
|
|
||||||
|
import (
|
||||||
|
cbApi "Open_IM/pkg/call_back_struct"
|
||||||
|
"Open_IM/pkg/common/config"
|
||||||
|
"Open_IM/pkg/common/constant"
|
||||||
|
"Open_IM/pkg/common/http"
|
||||||
|
"Open_IM/pkg/common/log"
|
||||||
|
pbChat "Open_IM/pkg/proto/chat"
|
||||||
|
"Open_IM/pkg/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq {
|
||||||
|
return cbApi.CommonCallbackReq{
|
||||||
|
SendID: msg.MsgData.SendID,
|
||||||
|
ServerMsgID: msg.MsgData.ServerMsgID,
|
||||||
|
ClientMsgID: msg.MsgData.ClientMsgID,
|
||||||
|
OperationID: msg.OperationID,
|
||||||
|
SenderPlatformID: msg.MsgData.SenderPlatformID,
|
||||||
|
SenderNickname: msg.MsgData.SenderNickname,
|
||||||
|
SessionType: msg.MsgData.SessionType,
|
||||||
|
MsgFrom: msg.MsgData.MsgFrom,
|
||||||
|
ContentType: msg.MsgData.ContentType,
|
||||||
|
Status: msg.MsgData.Status,
|
||||||
|
CreateTime: msg.MsgData.CreateTime,
|
||||||
|
Content: string(msg.MsgData.Content),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) {
|
||||||
|
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
|
||||||
|
commonCallbackReq := copyCallbackCommonReqStruct(msg)
|
||||||
|
commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendSingleMsgCommand
|
||||||
|
req := cbApi.CallbackBeforeSendSingleMsgReq{
|
||||||
|
CommonCallbackReq: commonCallbackReq,
|
||||||
|
RecvID: msg.MsgData.RecvID,
|
||||||
|
}
|
||||||
|
resp := &cbApi.CallbackBeforeSendSingleMsgResp{
|
||||||
|
CommonCallbackResp: cbApi.CommonCallbackResp{},
|
||||||
|
}
|
||||||
|
//utils.CopyStructFields(req, msg.MsgData)
|
||||||
|
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||||
|
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackTimeOut); err != nil {
|
||||||
|
if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue {
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error {
|
||||||
|
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
|
||||||
|
commonCallbackReq := copyCallbackCommonReqStruct(msg)
|
||||||
|
commonCallbackReq.CallbackCommand = constant.CallbackAfterSendSingleMsgCommand
|
||||||
|
req := cbApi.CallbackAfterSendSingleMsgReq{
|
||||||
|
CommonCallbackReq: commonCallbackReq,
|
||||||
|
RecvID: msg.MsgData.RecvID,
|
||||||
|
}
|
||||||
|
resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
|
||||||
|
//utils.CopyStructFields(req, msg.MsgData)
|
||||||
|
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||||
|
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) {
|
||||||
|
if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
|
||||||
|
commonCallbackReq := copyCallbackCommonReqStruct(msg)
|
||||||
|
commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendGroupMsgCommand
|
||||||
|
req := cbApi.CallbackAfterSendGroupMsgReq{
|
||||||
|
CommonCallbackReq: commonCallbackReq,
|
||||||
|
GroupID: msg.MsgData.GroupID,
|
||||||
|
}
|
||||||
|
resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
|
||||||
|
//utils.CopyStructFields(req, msg.MsgData)
|
||||||
|
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||||
|
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil {
|
||||||
|
if !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue {
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error {
|
||||||
|
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
|
||||||
|
commonCallbackReq := copyCallbackCommonReqStruct(msg)
|
||||||
|
commonCallbackReq.CallbackCommand = constant.CallbackAfterSendGroupMsgCommand
|
||||||
|
req := cbApi.CallbackAfterSendGroupMsgReq{
|
||||||
|
CommonCallbackReq: commonCallbackReq,
|
||||||
|
GroupID: msg.MsgData.GroupID,
|
||||||
|
}
|
||||||
|
resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
|
||||||
|
|
||||||
|
//utils.CopyStructFields(req, msg.MsgData)
|
||||||
|
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||||
|
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) {
|
||||||
|
if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
|
||||||
|
commonCallbackReq := copyCallbackCommonReqStruct(msg)
|
||||||
|
commonCallbackReq.CallbackCommand = constant.CallbackWordFilterCommand
|
||||||
|
req := cbApi.CallbackWordFilterReq{
|
||||||
|
CommonCallbackReq: commonCallbackReq,
|
||||||
|
}
|
||||||
|
resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}}
|
||||||
|
//utils.CopyStructFields(&req., msg.MsgData)
|
||||||
|
defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||||
|
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil {
|
||||||
|
if !config.Config.Callback.CallbackWordFilter.CallbackFailedContinue {
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), "callback failed and config disable, stop this operation")
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if resp.ErrCode == constant.CallbackHandleSuccess {
|
||||||
|
msg.MsgData.Content = []byte(resp.Content)
|
||||||
|
}
|
||||||
|
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content))
|
||||||
|
}
|
||||||
|
return true, err
|
||||||
|
}
|
@ -4,7 +4,6 @@ import (
|
|||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db"
|
"Open_IM/pkg/common/db"
|
||||||
http2 "Open_IM/pkg/common/http"
|
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||||
pbChat "Open_IM/pkg/proto/chat"
|
pbChat "Open_IM/pkg/proto/chat"
|
||||||
@ -12,11 +11,9 @@ import (
|
|||||||
sdk_ws "Open_IM/pkg/proto/sdk_ws"
|
sdk_ws "Open_IM/pkg/proto/sdk_ws"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"github.com/garyburd/redigo/redis"
|
"github.com/garyburd/redigo/redis"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -127,25 +124,27 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|||||||
if !isHistory {
|
if !isHistory {
|
||||||
mReq.IsOnlineOnly = true
|
mReq.IsOnlineOnly = true
|
||||||
}
|
}
|
||||||
mResp := MsgCallBackResp{}
|
|
||||||
if config.Config.MessageCallBack.CallbackSwitch {
|
// callback
|
||||||
bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, mReq, config.Config.MessageCallBack.CallBackTimeOut)
|
canSend, err := callbackWordFilter(pb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorByKv("callback to Business server err", pb.OperationID, "args", pb.String(), "err", err.Error())
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter failed", err.Error(), pb.MsgData)
|
||||||
return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "", 0)
|
|
||||||
} else if err = json.Unmarshal(bMsg, &mResp); err != nil {
|
|
||||||
log.ErrorByKv("ws json Unmarshal err", pb.OperationID, "args", pb.String(), "err", err.Error())
|
|
||||||
return returnMsg(&replay, pb, 200, err.Error(), "", 0)
|
|
||||||
} else {
|
|
||||||
if mResp.ErrCode != 0 {
|
|
||||||
return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0)
|
|
||||||
} else {
|
|
||||||
pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if !canSend {
|
||||||
|
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", canSend, "end rpc and return", pb.MsgData)
|
||||||
|
return returnMsg(&replay, pb, 201, "callbackWordFilter result stop rpc and return", "", 0)
|
||||||
}
|
}
|
||||||
switch pb.MsgData.SessionType {
|
switch pb.MsgData.SessionType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
|
// callback
|
||||||
|
canSend, err := callbackBeforeSendSingleMsg(pb)
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg failed", err.Error())
|
||||||
|
}
|
||||||
|
if !canSend {
|
||||||
|
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", canSend, "end rpc and return")
|
||||||
|
return returnMsg(&replay, pb, 201, "callbackBeforeSendSingleMsg result stop rpc and return", "", 0)
|
||||||
|
}
|
||||||
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
|
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
|
||||||
if isSend {
|
if isSend {
|
||||||
msgToMQ.MsgData = pb.MsgData
|
msgToMQ.MsgData = pb.MsgData
|
||||||
@ -163,8 +162,21 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|||||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// callback
|
||||||
|
if err := callbackAfterSendSingleMsg(pb); err != nil {
|
||||||
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error())
|
||||||
|
}
|
||||||
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
|
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
|
||||||
case constant.GroupChatType:
|
case constant.GroupChatType:
|
||||||
|
// callback
|
||||||
|
canSend, err := callbackBeforeSendGroupMsg(pb)
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg failed", err.Error())
|
||||||
|
}
|
||||||
|
if !canSend {
|
||||||
|
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg result", canSend, "end rpc and return")
|
||||||
|
return returnMsg(&replay, pb, 201, "callbackBeforeSendGroupMsg result stop rpc and return", "", 0)
|
||||||
|
}
|
||||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
|
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
|
||||||
client := pbGroup.NewGroupClient(etcdConn)
|
client := pbGroup.NewGroupClient(etcdConn)
|
||||||
req := &pbGroup.GetGroupAllMemberReq{
|
req := &pbGroup.GetGroupAllMemberReq{
|
||||||
@ -228,7 +240,10 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|||||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// callback
|
||||||
|
if err := callbackAfterSendGroupMsg(pb); err != nil {
|
||||||
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error())
|
||||||
}
|
}
|
||||||
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
|
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
|
||||||
default:
|
default:
|
||||||
@ -245,7 +260,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error {
|
|||||||
}
|
}
|
||||||
func GetMsgID(sendID string) string {
|
func GetMsgID(sendID string) string {
|
||||||
t := time.Now().Format("2006-01-02 15:04:05")
|
t := time.Now().Format("2006-01-02 15:04:05")
|
||||||
return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
|
return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) {
|
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) {
|
||||||
|
@ -164,6 +164,18 @@ func GetRangeDate(from, to time.Time) [][2]time.Time {
|
|||||||
}
|
}
|
||||||
// month
|
// month
|
||||||
case !isInOneMonth(from, to):
|
case !isInOneMonth(from, to):
|
||||||
|
if to.Sub(from) < time.Hour * 24 * 30 {
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
fromTime := from.Add(time.Hour * 24 * time.Duration(i))
|
||||||
|
toTime := from.Add(time.Hour * 24 * time.Duration(i+1))
|
||||||
|
if toTime.After(to.Add(time.Hour * 24)) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
times = append(times, [2]time.Time{
|
||||||
|
fromTime, toTime,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
fromTime := from
|
fromTime := from
|
||||||
@ -188,6 +200,7 @@ func GetRangeDate(from, to time.Time) [][2]time.Time {
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return times
|
return times
|
||||||
}
|
}
|
||||||
|
|
||||||
|
1
internal/rpc/user/callback.go
Normal file
1
internal/rpc/user/callback.go
Normal file
@ -0,0 +1 @@
|
|||||||
|
package user
|
1
internal/utils/callback.go
Normal file
1
internal/utils/callback.go
Normal file
@ -0,0 +1 @@
|
|||||||
|
package utils
|
@ -1 +1,26 @@
|
|||||||
package call_back_struct
|
package call_back_struct
|
||||||
|
|
||||||
|
type CommonCallbackReq struct {
|
||||||
|
SendID string `json:"sendID"`
|
||||||
|
CallbackCommand string `json:"callbackCommand"`
|
||||||
|
ServerMsgID string `json:"serverMsgID"`
|
||||||
|
ClientMsgID string `json:"clientMsgID"`
|
||||||
|
OperationID string `json:"operationID"`
|
||||||
|
SenderPlatformID int32 `json:"senderPlatformID"`
|
||||||
|
SenderNickname string `json:"senderNickname"`
|
||||||
|
SessionType int32 `json:"sessionType"`
|
||||||
|
MsgFrom int32 `json:"msgFrom"`
|
||||||
|
ContentType int32 `json:"contentType"`
|
||||||
|
Status int32 `json:"status"`
|
||||||
|
CreateTime int64 `json:"createTime"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CommonCallbackResp struct {
|
||||||
|
ActionCode int `json:"actionCode"`
|
||||||
|
ErrCode int `json:"errCode"`
|
||||||
|
ErrMsg string `json:"errMsg"`
|
||||||
|
OperationID string `json:"operationID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
9
pkg/call_back_struct/group.go
Normal file
9
pkg/call_back_struct/group.go
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
package call_back_struct
|
||||||
|
|
||||||
|
type CallbackBeforeCreateGroupReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterCreateGroupResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
@ -1 +1,47 @@
|
|||||||
package call_back_struct
|
package call_back_struct
|
||||||
|
|
||||||
|
|
||||||
|
type CallbackBeforeSendSingleMsgReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
RecvID string `json:"recvID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackBeforeSendSingleMsgResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterSendSingleMsgReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
RecvID string `json:"recvID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterSendSingleMsgResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackBeforeSendGroupMsgReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
GroupID string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackBeforeSendGroupMsgResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterSendGroupMsgReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
GroupID string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterSendGroupMsgResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackWordFilterReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackWordFilterResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
Content string `json:"content"`
|
||||||
|
}
|
||||||
|
@ -19,6 +19,12 @@ var (
|
|||||||
|
|
||||||
var Config config
|
var Config config
|
||||||
|
|
||||||
|
type callBackConfig struct {
|
||||||
|
Enable bool `yaml:"enable"`
|
||||||
|
CallbackTimeOut int `yaml:"callbackTimeOut"`
|
||||||
|
CallbackFailedContinue bool `CallbackFailedContinue`
|
||||||
|
}
|
||||||
|
|
||||||
type config struct {
|
type config struct {
|
||||||
ServerIP string `yaml:"serverip"`
|
ServerIP string `yaml:"serverip"`
|
||||||
ServerVersion string `yaml:"serverversion"`
|
ServerVersion string `yaml:"serverversion"`
|
||||||
@ -167,11 +173,6 @@ type config struct {
|
|||||||
AccessSecret string `yaml:"accessSecret"`
|
AccessSecret string `yaml:"accessSecret"`
|
||||||
AccessExpire int64 `yaml:"accessExpire"`
|
AccessExpire int64 `yaml:"accessExpire"`
|
||||||
}
|
}
|
||||||
MessageCallBack struct {
|
|
||||||
CallbackSwitch bool `yaml:"callbackSwitch"`
|
|
||||||
CallbackUrl string `yaml:"callbackUrl"`
|
|
||||||
CallBackTimeOut int `yaml:"callbackTimeOut"`
|
|
||||||
}
|
|
||||||
MessageJudge struct {
|
MessageJudge struct {
|
||||||
IsJudgeFriend bool `yaml:"isJudgeFriend"`
|
IsJudgeFriend bool `yaml:"isJudgeFriend"`
|
||||||
}
|
}
|
||||||
@ -179,6 +180,15 @@ type config struct {
|
|||||||
PushSound string `yaml:"pushSound"`
|
PushSound string `yaml:"pushSound"`
|
||||||
BadgeCount bool `yaml:"badgeCount"`
|
BadgeCount bool `yaml:"badgeCount"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Callback struct {
|
||||||
|
CallbackUrl string `yaml:"callbackUrl"`
|
||||||
|
CallbackBeforeSendSingleMsg callBackConfig `yaml:"callbackbeforeSendSingleMsg"`
|
||||||
|
CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackAfterSendSingleMsg"`
|
||||||
|
CallbackBeforeSendGroupMsg callBackConfig `yaml:"callbackBeforeSendGroupMsg"`
|
||||||
|
CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"`
|
||||||
|
CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"`
|
||||||
|
} `yaml:"callback"`
|
||||||
Notification struct {
|
Notification struct {
|
||||||
///////////////////////group/////////////////////////////
|
///////////////////////group/////////////////////////////
|
||||||
GroupCreated struct {
|
GroupCreated struct {
|
||||||
|
@ -136,6 +136,19 @@ const (
|
|||||||
VerificationCodeForReset = 2
|
VerificationCodeForReset = 2
|
||||||
VerificationCodeForRegisterSuffix = "_forRegister"
|
VerificationCodeForRegisterSuffix = "_forRegister"
|
||||||
VerificationCodeForResetSuffix = "_forReset"
|
VerificationCodeForResetSuffix = "_forReset"
|
||||||
|
|
||||||
|
//callbackCommand
|
||||||
|
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
|
||||||
|
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
|
||||||
|
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
|
||||||
|
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
|
||||||
|
CallbackWordFilterCommand = "callbackWordFilterCommand"
|
||||||
|
//callback actionCode
|
||||||
|
ActionAllow = 0
|
||||||
|
ActionForbidden = 1
|
||||||
|
//callback callbackHandleCode
|
||||||
|
CallbackHandleSuccess = 0
|
||||||
|
CallbackHandleFailed = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
var ContentType2PushContent = map[int64]string{
|
var ContentType2PushContent = map[int64]string{
|
||||||
|
@ -51,6 +51,7 @@ var (
|
|||||||
ErrAccess = ErrInfo{ErrCode: 801, ErrMsg: AccessMsg.Error()}
|
ErrAccess = ErrInfo{ErrCode: 801, ErrMsg: AccessMsg.Error()}
|
||||||
ErrDB = ErrInfo{ErrCode: 802, ErrMsg: DBMsg.Error()}
|
ErrDB = ErrInfo{ErrCode: 802, ErrMsg: DBMsg.Error()}
|
||||||
ErrArgs = ErrInfo{ErrCode: 8003, ErrMsg: ArgsMsg.Error()}
|
ErrArgs = ErrInfo{ErrCode: 8003, ErrMsg: ArgsMsg.Error()}
|
||||||
|
ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()}
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -64,6 +65,7 @@ var (
|
|||||||
AccessMsg = errors.New("no permission")
|
AccessMsg = errors.New("no permission")
|
||||||
DBMsg = errors.New("db failed")
|
DBMsg = errors.New("db failed")
|
||||||
ArgsMsg = errors.New("args failed")
|
ArgsMsg = errors.New("args failed")
|
||||||
|
CallBackMsg = errors.New("callback failed")
|
||||||
|
|
||||||
ThirdPartyMsg = errors.New("third party error")
|
ThirdPartyMsg = errors.New("third party error")
|
||||||
)
|
)
|
||||||
|
@ -26,7 +26,7 @@ func GetFriendRelationshipFromFriend(OwnerUserID, FriendUserID string) (*db.Frie
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var friend db.Friend
|
var friend db.Friend
|
||||||
err = dbConn.Table("friends").Where("owner_user_id=? and friend_user_id=?", OwnerUserID, FriendUserID).Find(&friend).Error
|
err = dbConn.Table("friends").Where("owner_user_id=? and friend_user_id=?", OwnerUserID, FriendUserID).Take(&friend).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func GetFriendApplicationByBothUserID(FromUserID, ToUserID string) (*db.FriendRe
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var friendRequest db.FriendRequest
|
var friendRequest db.FriendRequest
|
||||||
err = dbConn.Table("friend_requests").Where("from_user_id=? and to_user_id=?", FromUserID, ToUserID).Find(&friendRequest).Error
|
err = dbConn.Table("friend_requests").Where("from_user_id=? and to_user_id=?", FromUserID, ToUserID).Take(&friendRequest).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db"
|
"Open_IM/pkg/common/db"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -43,6 +44,7 @@ func GetGroupMemberListByUserID(userID string) ([]db.GroupMember, error) {
|
|||||||
}
|
}
|
||||||
var groupMemberList []db.GroupMember
|
var groupMemberList []db.GroupMember
|
||||||
err = dbConn.Table("group_members").Where("user_id=?", userID).Find(&groupMemberList).Error
|
err = dbConn.Table("group_members").Where("user_id=?", userID).Find(&groupMemberList).Error
|
||||||
|
//err = dbConn.Table("group_members").Where("user_id=?", userID).Take(&groupMemberList).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -82,7 +84,7 @@ func GetGroupMemberInfoByGroupIDAndUserID(groupID, userID string) (*db.GroupMemb
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var groupMember db.GroupMember
|
var groupMember db.GroupMember
|
||||||
err = dbConn.Table("group_members").Where("group_id=? and user_id=? ", groupID, userID).Limit(1).Find(&groupMember).Error
|
err = dbConn.Table("group_members").Where("group_id=? and user_id=? ", groupID, userID).Limit(1).Take(&groupMember).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -149,7 +151,8 @@ func GetGroupOwnerInfoByGroupID(groupID string) (*db.GroupMember, error) {
|
|||||||
return &v, nil
|
return &v, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, nil
|
|
||||||
|
return nil, utils.Wrap(errors.New("no owner"), "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsExistGroupMember(groupID, userID string) bool {
|
func IsExistGroupMember(groupID, userID string) bool {
|
||||||
|
@ -45,7 +45,7 @@ func GetGroupInfoByGroupID(groupId string) (*db.Group, error) {
|
|||||||
return nil, utils.Wrap(err, "")
|
return nil, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
var groupInfo db.Group
|
var groupInfo db.Group
|
||||||
err = dbConn.Table("groups").Where("group_id=?", groupId).Find(&groupInfo).Error
|
err = dbConn.Table("groups").Where("group_id=?", groupId).Take(&groupInfo).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -86,7 +86,6 @@ func GetGroups(pageNumber, showNumber int) ([]db.Group, error) {
|
|||||||
return groups, nil
|
return groups, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func OperateGroupStatus(groupId string, groupStatus int32) error {
|
func OperateGroupStatus(groupId string, groupStatus int32) error {
|
||||||
group := db.Group{
|
group := db.Group{
|
||||||
GroupID: groupId,
|
GroupID: groupId,
|
||||||
@ -98,7 +97,6 @@ func OperateGroupStatus(groupId string, groupStatus int32) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func DeleteGroup(groupId string) error {
|
func DeleteGroup(groupId string) error {
|
||||||
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
|
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -129,8 +127,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string,
|
|||||||
updateInfo := db.GroupMember{
|
updateInfo := db.GroupMember{
|
||||||
RoleLevel: roleLevel,
|
RoleLevel: roleLevel,
|
||||||
}
|
}
|
||||||
groupMaster := db.GroupMember{
|
groupMaster := db.GroupMember{}
|
||||||
}
|
|
||||||
switch roleLevel {
|
switch roleLevel {
|
||||||
case constant.GroupOwner:
|
case constant.GroupOwner:
|
||||||
err = dbConn.Transaction(func(tx *gorm.DB) error {
|
err = dbConn.Transaction(func(tx *gorm.DB) error {
|
||||||
|
@ -63,7 +63,7 @@ func GetGroupRequestByGroupIDAndUserID(groupID, userID string) (*db.GroupRequest
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var groupRequest db.GroupRequest
|
var groupRequest db.GroupRequest
|
||||||
err = dbConn.Table("group_requests").Where("user_id=? and group_id=?", userID, groupID).Find(&groupRequest).Error
|
err = dbConn.Table("group_requests").Where("user_id=? and group_id=?", userID, groupID).Take(&groupRequest).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -128,7 +128,6 @@ func GetUserReqGroupByUserID(userID string) ([]db.GroupRequest, error) {
|
|||||||
return groupRequestList, err
|
return groupRequestList, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//
|
//
|
||||||
//func GroupApplicationResponse(pb *group.GroupApplicationResponseReq) (*group.CommonResp, error) {
|
//func GroupApplicationResponse(pb *group.GroupApplicationResponseReq) (*group.CommonResp, error) {
|
||||||
//
|
//
|
||||||
|
@ -51,18 +51,6 @@ func UserRegister(user db.User) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type User struct {
|
|
||||||
UserID string `gorm:"column:user_id;primaryKey;"`
|
|
||||||
Nickname string `gorm:"column:name"`
|
|
||||||
FaceUrl string `gorm:"column:icon"`
|
|
||||||
Gender int32 `gorm:"column:gender"`
|
|
||||||
PhoneNumber string `gorm:"column:phone_number"`
|
|
||||||
Birth string `gorm:"column:birth"`
|
|
||||||
Email string `gorm:"column:email"`
|
|
||||||
Ex string `gorm:"column:ex"`
|
|
||||||
CreateTime time.Time `gorm:"column:create_time"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func DeleteUser(userID string) (i int64) {
|
func DeleteUser(userID string) (i int64) {
|
||||||
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
|
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -78,7 +66,7 @@ func GetUserByUserID(userID string) (*db.User, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var user db.User
|
var user db.User
|
||||||
err = dbConn.Table("users").Where("user_id=?", userID).First(&user).Error
|
err = dbConn.Table("users").Where("user_id=?", userID).Take(&user).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ package im_mysql_msg_model
|
|||||||
import (
|
import (
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db"
|
"Open_IM/pkg/common/db"
|
||||||
|
"Open_IM/pkg/common/log"
|
||||||
pbMsg "Open_IM/pkg/proto/chat"
|
pbMsg "Open_IM/pkg/proto/chat"
|
||||||
"Open_IM/pkg/proto/sdk_ws"
|
"Open_IM/pkg/proto/sdk_ws"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
@ -45,5 +46,6 @@ func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error {
|
|||||||
}
|
}
|
||||||
chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime)
|
chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime)
|
||||||
chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime)
|
chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime)
|
||||||
|
log.NewDebug("test", "this is ", chatLog)
|
||||||
return dbConn.Table("chat_logs").Create(chatLog).Error
|
return dbConn.Table("chat_logs").Create(chatLog).Error
|
||||||
}
|
}
|
||||||
|
@ -53,3 +53,14 @@ func Post(url string, data interface{}, timeOutSecond int) (content []byte, err
|
|||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PostReturn(url string, input, output interface{}, timeOut int) error {
|
||||||
|
b, err := Post(url, input, timeOut)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = json.Unmarshal(b, output); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -28,6 +28,17 @@ func (f *fileHook) Fire(entry *logrus.Entry) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//func (f *fileHook) Fire(entry *logrus.Entry) error {
|
||||||
|
// var s string
|
||||||
|
// _, b, c, _ := runtime.Caller(10)
|
||||||
|
// i := strings.SplitAfter(b, "/")
|
||||||
|
// if len(i) > 3 {
|
||||||
|
// s = i[len(i)-3] + i[len(i)-2] + i[len(i)-1] + ":" + utils.IntToString(c)
|
||||||
|
// }
|
||||||
|
// entry.Data["FilePath"] = s
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
|
||||||
func findCaller(skip int) string {
|
func findCaller(skip int) string {
|
||||||
file := ""
|
file := ""
|
||||||
line := 0
|
line := 0
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package getcdv3
|
package getcdv3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"Open_IM/pkg/common/log"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
@ -39,9 +40,9 @@ func RegisterEtcd4Unique(schema, etcdAddr, myHost string, myPort int, serviceNam
|
|||||||
func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error {
|
func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error {
|
||||||
cli, err := clientv3.New(clientv3.Config{
|
cli, err := clientv3.New(clientv3.Config{
|
||||||
Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second})
|
Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second})
|
||||||
fmt.Println("RegisterEtcd")
|
|
||||||
|
log.Info("", "RegisterEtcd, ", schema, etcdAddr, myHost, myPort, serviceName, ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// return fmt.Errorf("grpclb: create clientv3 client failed: %v", err)
|
|
||||||
return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr)
|
return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,15 +67,16 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("keepalive failed, errmsg:%v, lease id:%d", err, resp.ID)
|
return fmt.Errorf("keepalive failed, errmsg:%v, lease id:%d", err, resp.ID)
|
||||||
}
|
}
|
||||||
fmt.Println("RegisterEtcd ok")
|
log.Info("", "RegisterEtcd ok ")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case v, ok := <-kresp:
|
case v, ok := <-kresp:
|
||||||
if ok == true {
|
if ok == true {
|
||||||
// fmt.Println(" kresp ok ", v)
|
log.Debug("", "KeepAlive kresp ok", v, schema, etcdAddr, myHost, myPort, serviceName, ttl)
|
||||||
} else {
|
} else {
|
||||||
fmt.Println(" kresp failed ", v)
|
log.Error("", "KeepAlive kresp failed", schema, etcdAddr, myHost, myPort, serviceName, ttl)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package getcdv3
|
package getcdv3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"Open_IM/pkg/common/log"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
@ -102,7 +103,7 @@ func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts re
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
var addrList []resolver.Address
|
var addrList []resolver.Address
|
||||||
for i := range resp.Kvs {
|
for i := range resp.Kvs {
|
||||||
fmt.Println("init addr: ", string(resp.Kvs[i].Value))
|
log.Debug("", "init addr: ", string(resp.Kvs[i].Value))
|
||||||
addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)})
|
addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)})
|
||||||
}
|
}
|
||||||
r.cc.UpdateState(resolver.State{Addresses: addrList})
|
r.cc.UpdateState(resolver.State{Addresses: addrList})
|
||||||
@ -148,27 +149,27 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) {
|
|||||||
if !exists(addrList, string(ev.Kv.Value)) {
|
if !exists(addrList, string(ev.Kv.Value)) {
|
||||||
flag = 1
|
flag = 1
|
||||||
addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)})
|
addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)})
|
||||||
fmt.Println("after add, new list: ", addrList)
|
log.Debug("", "after add, new list: ", addrList)
|
||||||
}
|
}
|
||||||
case mvccpb.DELETE:
|
case mvccpb.DELETE:
|
||||||
fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value))
|
log.Debug("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value))
|
||||||
i := strings.LastIndexAny(string(ev.Kv.Key), "/")
|
i := strings.LastIndexAny(string(ev.Kv.Key), "/")
|
||||||
if i < 0 {
|
if i < 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t := string(ev.Kv.Key)[i+1:]
|
t := string(ev.Kv.Key)[i+1:]
|
||||||
fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t)
|
log.Debug("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t)
|
||||||
if s, ok := remove(addrList, t); ok {
|
if s, ok := remove(addrList, t); ok {
|
||||||
flag = 1
|
flag = 1
|
||||||
addrList = s
|
addrList = s
|
||||||
fmt.Println("after remove, new list: ", addrList)
|
log.Debug("after remove, new list: ", addrList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if flag == 1 {
|
if flag == 1 {
|
||||||
r.cc.UpdateState(resolver.State{Addresses: addrList})
|
r.cc.UpdateState(resolver.State{Addresses: addrList})
|
||||||
fmt.Println("update: ", addrList)
|
log.Debug("update: ", addrList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -176,7 +177,7 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) {
|
|||||||
func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
|
func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
|
||||||
gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")})
|
gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("eeeeeeeeeeeee", err.Error())
|
log.Error("clientv3.New failed", err.Error())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,7 +201,7 @@ func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
gEtcdCli.Close()
|
gEtcdCli.Close()
|
||||||
fmt.Println("rrrrrrrrrrr", err.Error())
|
log.Error("gEtcdCli.Get failed", err.Error())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
gEtcdCli.Close()
|
gEtcdCli.Close()
|
||||||
@ -236,7 +237,7 @@ func GetConnPool(schema, etcdaddr, servicename string) (*ClientConn, error) {
|
|||||||
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond))
|
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond))
|
||||||
|
|
||||||
c, err := p.Get(ctx)
|
c, err := p.Get(ctx)
|
||||||
fmt.Println(err)
|
log.Info("", "Get ", err)
|
||||||
return c, err
|
return c, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -338,6 +338,36 @@ message ConversationUpdateTips{
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///callback
|
||||||
|
message CommonCallbackURLReq {
|
||||||
|
string CallbackCommand = 1 [json_name = "code"];
|
||||||
|
string OpenIMServerID = 2;
|
||||||
|
string OperationID = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CommonCallbackURLResp {
|
||||||
|
string Code = 1 [json_name = "code"];
|
||||||
|
string Msg = 2 [json_name = "msg"];
|
||||||
|
string OperationID = 3 [json_name = "operationID"];
|
||||||
|
}
|
||||||
|
|
||||||
|
message CallbackBeforeSendMsgReq {
|
||||||
|
commonReq CommonCallbackURLReq = 1;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message CallbackBeforeSendMsgResp {
|
||||||
|
commonResp CommonCallbackURLResp = 1;
|
||||||
|
string FromUserID = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CallbackAfterAddFriendReq {
|
||||||
|
commonReq CommonCallbackURLReq = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CallbackAfterAddFriendResp {
|
||||||
|
commonResp CommonCallbackURLResp = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
///cms
|
///cms
|
||||||
|
31
pkg/statistics/statistics.go
Normal file
31
pkg/statistics/statistics.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package statistics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Statistics struct {
|
||||||
|
Count *uint64
|
||||||
|
ModuleName string
|
||||||
|
PrintArgs string
|
||||||
|
SleepTime int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Statistics) output() {
|
||||||
|
t := time.NewTicker(time.Duration(s.SleepTime) * time.Second)
|
||||||
|
defer t.Stop()
|
||||||
|
//var sum uint64
|
||||||
|
for {
|
||||||
|
//sum = *s.Count
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
}
|
||||||
|
//log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics {
|
||||||
|
p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs}
|
||||||
|
go p.output()
|
||||||
|
return p
|
||||||
|
}
|
@ -3,6 +3,8 @@
|
|||||||
source ./style_info.cfg
|
source ./style_info.cfg
|
||||||
source ./path_info.cfg
|
source ./path_info.cfg
|
||||||
source ./function.sh
|
source ./function.sh
|
||||||
|
ulimit -n 200000
|
||||||
|
|
||||||
list1=$(cat $config_path | grep openImOnlineRelayPort | awk -F '[:]' '{print $NF}')
|
list1=$(cat $config_path | grep openImOnlineRelayPort | awk -F '[:]' '{print $NF}')
|
||||||
list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}')
|
list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}')
|
||||||
list_to_string $list1
|
list_to_string $list1
|
||||||
|
@ -6,6 +6,7 @@ source ./function.sh
|
|||||||
list1=$(cat $config_path | grep openImApiPort | awk -F '[:]' '{print $NF}')
|
list1=$(cat $config_path | grep openImApiPort | awk -F '[:]' '{print $NF}')
|
||||||
list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}')
|
list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}')
|
||||||
list3=$(cat $config_path | grep openImSdkWsPort | awk -F '[:]' '{print $NF}')
|
list3=$(cat $config_path | grep openImSdkWsPort | awk -F '[:]' '{print $NF}')
|
||||||
|
logLevel=$(cat $config_path | grep remainLogLevel | awk -F '[:]' '{print $NF}')
|
||||||
list_to_string $list1
|
list_to_string $list1
|
||||||
api_ports=($ports_array)
|
api_ports=($ports_array)
|
||||||
list_to_string $list2
|
list_to_string $list2
|
||||||
@ -26,7 +27,7 @@ fi
|
|||||||
#Waiting port recycling
|
#Waiting port recycling
|
||||||
sleep 1
|
sleep 1
|
||||||
cd ${sdk_server_binary_root}
|
cd ${sdk_server_binary_root}
|
||||||
nohup ./${sdk_server_name} -openIM_api_port ${api_ports[0]} -openIM_ws_port ${ws_ports[0]} -sdk_ws_port ${sdk_ws_ports[0]} >>../logs/openIM.log 2>&1 &
|
nohup ./${sdk_server_name} -openIM_api_port ${api_ports[0]} -openIM_ws_port ${ws_ports[0]} -sdk_ws_port ${sdk_ws_ports[0]} -openIM_log_level ${logLevel} >>../logs/openIM.log 2>&1 &
|
||||||
|
|
||||||
#Check launched service process
|
#Check launched service process
|
||||||
sleep 3
|
sleep 3
|
||||||
|
Loading…
x
Reference in New Issue
Block a user