mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge remote-tracking branch 'origin/main'
This commit is contained in:
commit
0d54547dd3
@ -105,13 +105,13 @@ business data.
|
|||||||
- **MySQL**
|
- **MySQL**
|
||||||
|
|
||||||
```
|
```
|
||||||
待补充
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
- **MongoDB**
|
- **MongoDB**
|
||||||
|
|
||||||
```
|
```
|
||||||
待补充
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
6. Enter the script directory and execute the script according to the steps。
|
6. Enter the script directory and execute the script according to the steps。
|
||||||
|
26
src/msg_gateway/Makefile
Normal file
26
src/msg_gateway/Makefile
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
.PHONY: all build run gotool install clean help
|
||||||
|
|
||||||
|
BINARY_NAME=open_im_msg_gateway
|
||||||
|
BIN_DIR=../../bin/
|
||||||
|
LAN_FILE=.go
|
||||||
|
GO_FILE:=${BINARY_NAME}${LAN_FILE}
|
||||||
|
|
||||||
|
all: gotool build
|
||||||
|
|
||||||
|
build:
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE}
|
||||||
|
|
||||||
|
run:
|
||||||
|
@go run ./
|
||||||
|
|
||||||
|
gotool:
|
||||||
|
go fmt ./
|
||||||
|
go vet ./
|
||||||
|
|
||||||
|
install:
|
||||||
|
make build
|
||||||
|
mv ${BINARY_NAME} ${BIN_DIR}
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
|
||||||
|
|
29
src/msg_gateway/gate/init.go
Normal file
29
src/msg_gateway/gate/init.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package gate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
"github.com/go-playground/validator/v10"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
rwLock *sync.RWMutex
|
||||||
|
validate *validator.Validate
|
||||||
|
ws WServer
|
||||||
|
rpcSvr RPCServer
|
||||||
|
)
|
||||||
|
|
||||||
|
func Init(rpcPort, wsPort int) {
|
||||||
|
//log initialization
|
||||||
|
log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName)
|
||||||
|
rwLock = new(sync.RWMutex)
|
||||||
|
validate = validator.New()
|
||||||
|
ws.onInit(wsPort)
|
||||||
|
rpcSvr.onInit(rpcPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Run() {
|
||||||
|
go ws.run()
|
||||||
|
go rpcSvr.run()
|
||||||
|
}
|
197
src/msg_gateway/gate/logic.go
Normal file
197
src/msg_gateway/gate/logic.go
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
package gate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/constant"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbChat "Open_IM/src/proto/chat"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) {
|
||||||
|
//ws online debug data
|
||||||
|
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
|
||||||
|
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
|
||||||
|
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
|
||||||
|
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
|
||||||
|
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
|
||||||
|
m := Req{}
|
||||||
|
if err := json.Unmarshal(jsonMsg, &m); err != nil {
|
||||||
|
log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error())
|
||||||
|
ws.sendErrMsg(conn, 200, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := validate.Struct(m); err != nil {
|
||||||
|
log.ErrorByKv("ws args validate err", "", "err", err.Error())
|
||||||
|
ws.sendErrMsg(conn, 201, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !utils.VerifyToken(m.Token, m.SendID) {
|
||||||
|
ws.sendErrMsg(conn, 202, "token validate err")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
|
||||||
|
|
||||||
|
switch m.ReqIdentifier {
|
||||||
|
case constant.WSGetNewestSeq:
|
||||||
|
ws.newestSeqReq(conn, &m)
|
||||||
|
case constant.WSPullMsg:
|
||||||
|
ws.pullMsgReq(conn, &m)
|
||||||
|
case constant.WSSendMsg:
|
||||||
|
ws.sendMsgReq(conn, &m)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNewSeqResp) {
|
||||||
|
mReply := make(map[string]interface{})
|
||||||
|
mData := make(map[string]interface{})
|
||||||
|
mReply["reqIdentifier"] = m.ReqIdentifier
|
||||||
|
mReply["msgIncr"] = m.MsgIncr
|
||||||
|
mReply["errCode"] = pb.GetErrCode()
|
||||||
|
mReply["errMsg"] = pb.GetErrMsg()
|
||||||
|
mData["seq"] = pb.GetSeq()
|
||||||
|
mReply["data"] = mData
|
||||||
|
ws.sendMsg(conn, mReply)
|
||||||
|
}
|
||||||
|
func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) {
|
||||||
|
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
|
||||||
|
pbData := pbChat.GetNewSeqReq{}
|
||||||
|
pbData.UserID = m.SendID
|
||||||
|
pbData.OperationID = m.OperationID
|
||||||
|
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||||
|
if grpcConn == nil {
|
||||||
|
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m)
|
||||||
|
}
|
||||||
|
msgClient := pbChat.NewChatClient(grpcConn)
|
||||||
|
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.InfoByKv("rpc call success to getNewSeq", pbData.OperationID, "replyData", reply.String())
|
||||||
|
ws.newestSeqResp(conn, m, reply)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMessageResp) {
|
||||||
|
mReply := make(map[string]interface{})
|
||||||
|
msg := make(map[string]interface{})
|
||||||
|
mReply["reqIdentifier"] = m.ReqIdentifier
|
||||||
|
mReply["msgIncr"] = m.MsgIncr
|
||||||
|
mReply["errCode"] = pb.GetErrCode()
|
||||||
|
mReply["errMsg"] = pb.GetErrMsg()
|
||||||
|
//空切片
|
||||||
|
if v := pb.GetSingleUserMsg(); v != nil {
|
||||||
|
msg["single"] = v
|
||||||
|
} else {
|
||||||
|
msg["single"] = []pbChat.GatherFormat{}
|
||||||
|
}
|
||||||
|
if v := pb.GetGroupUserMsg(); v != nil {
|
||||||
|
msg["group"] = v
|
||||||
|
} else {
|
||||||
|
msg["group"] = []pbChat.GatherFormat{}
|
||||||
|
}
|
||||||
|
msg["maxSeq"] = pb.GetMaxSeq()
|
||||||
|
msg["minSeq"] = pb.GetMinSeq()
|
||||||
|
mReply["data"] = msg
|
||||||
|
ws.sendMsg(conn, mReply)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) {
|
||||||
|
log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m)
|
||||||
|
reply := new(pbChat.PullMessageResp)
|
||||||
|
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg)
|
||||||
|
if isPass {
|
||||||
|
pbData := pbChat.PullMessageReq{}
|
||||||
|
pbData.UserID = m.SendID
|
||||||
|
pbData.OperationID = m.OperationID
|
||||||
|
pbData.SeqBegin = data.(SeqData).SeqBegin
|
||||||
|
pbData.SeqEnd = data.(SeqData).SeqEnd
|
||||||
|
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||||
|
msgClient := pbChat.NewChatClient(grpcConn)
|
||||||
|
reply, err := msgClient.PullMessage(context.Background(), &pbData)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("PullMessage error", pbData.OperationID, "err", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
|
||||||
|
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
|
||||||
|
ws.pullMsgResp(conn, m, reply)
|
||||||
|
} else {
|
||||||
|
reply.ErrCode = errCode
|
||||||
|
reply.ErrMsg = errMsg
|
||||||
|
ws.pullMsgResp(conn, m, reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) sendMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.UserSendMsgResp) {
|
||||||
|
mReply := make(map[string]interface{})
|
||||||
|
mReplyData := make(map[string]interface{})
|
||||||
|
mReply["reqIdentifier"] = m.ReqIdentifier
|
||||||
|
mReply["msgIncr"] = m.MsgIncr
|
||||||
|
mReply["errCode"] = pb.GetErrCode()
|
||||||
|
mReply["errMsg"] = pb.GetErrMsg()
|
||||||
|
mReplyData["clientMsgID"] = pb.GetClientMsgID()
|
||||||
|
mReplyData["serverMsgID"] = pb.GetServerMsgID()
|
||||||
|
mReply["data"] = mReplyData
|
||||||
|
ws.sendMsg(conn, mReply)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) {
|
||||||
|
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
|
||||||
|
reply := new(pbChat.UserSendMsgResp)
|
||||||
|
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
|
||||||
|
if isPass {
|
||||||
|
data := pData.(MsgData)
|
||||||
|
pbData := pbChat.UserSendMsgReq{
|
||||||
|
ReqIdentifier: m.ReqIdentifier,
|
||||||
|
Token: m.Token,
|
||||||
|
SendID: m.SendID,
|
||||||
|
OperationID: m.OperationID,
|
||||||
|
MsgIncr: m.MsgIncr,
|
||||||
|
PlatformID: data.PlatformID,
|
||||||
|
SessionType: data.SessionType,
|
||||||
|
MsgFrom: data.MsgFrom,
|
||||||
|
ContentType: data.ContentType,
|
||||||
|
RecvID: data.RecvID,
|
||||||
|
ForceList: data.ForceList,
|
||||||
|
Content: data.Content,
|
||||||
|
Options: utils.MapToJsonString(data.Options),
|
||||||
|
ClientMsgID: data.ClientMsgID,
|
||||||
|
OffLineInfo: utils.MapToJsonString(data.OfflineInfo),
|
||||||
|
}
|
||||||
|
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
|
||||||
|
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||||
|
client := pbChat.NewChatClient(etcdConn)
|
||||||
|
log.Info("", "", "api UserSendMsg call, api call rpc...")
|
||||||
|
reply, _ := client.UserSendMsg(context.Background(), &pbData)
|
||||||
|
log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String())
|
||||||
|
ws.sendMsgResp(conn, m, reply)
|
||||||
|
} else {
|
||||||
|
reply.ErrCode = errCode
|
||||||
|
reply.ErrMsg = errMsg
|
||||||
|
ws.sendMsgResp(conn, m, reply)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) sendMsg(conn *websocket.Conn, mReply map[string]interface{}) {
|
||||||
|
bMsg, _ := json.Marshal(mReply)
|
||||||
|
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (ws *WServer) sendErrMsg(conn *websocket.Conn, errCode int32, errMsg string) {
|
||||||
|
mReply := make(map[string]interface{})
|
||||||
|
mReply["errCode"] = errCode
|
||||||
|
mReply["errMsg"] = errMsg
|
||||||
|
ws.sendMsg(conn, mReply)
|
||||||
|
}
|
155
src/msg_gateway/gate/rpc_server.go
Normal file
155
src/msg_gateway/gate/rpc_server.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
package gate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/constant"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbRelay "Open_IM/src/proto/relay"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RPCServer struct {
|
||||||
|
rpcPort int
|
||||||
|
rpcRegisterName string
|
||||||
|
etcdSchema string
|
||||||
|
etcdAddr []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RPCServer) onInit(rpcPort int) {
|
||||||
|
r.rpcPort = rpcPort
|
||||||
|
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName
|
||||||
|
r.etcdSchema = config.Config.Etcd.EtcdSchema
|
||||||
|
r.etcdAddr = config.Config.Etcd.EtcdAddr
|
||||||
|
}
|
||||||
|
func (r *RPCServer) run() {
|
||||||
|
ip := utils.ServerIP
|
||||||
|
registerAddress := ip + ":" + utils.IntToString(r.rpcPort)
|
||||||
|
listener, err := net.Listen("tcp", registerAddress)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByArgs(fmt.Sprintf("fail to listening consumer, err:%v\n", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer listener.Close()
|
||||||
|
srv := grpc.NewServer()
|
||||||
|
defer srv.GracefulStop()
|
||||||
|
pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r)
|
||||||
|
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error())
|
||||||
|
}
|
||||||
|
err = srv.Serve(listener)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("push message rpc listening err", "", "err", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbRelay.MsgToUserResp, error) {
|
||||||
|
log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String())
|
||||||
|
var resp []*pbRelay.SingleMsgToUser
|
||||||
|
var RecvID string
|
||||||
|
msg := make(map[string]interface{})
|
||||||
|
mReply := make(map[string]interface{})
|
||||||
|
mReply["reqIdentifier"] = constant.WSPushMsg
|
||||||
|
mReply["errCode"] = 0
|
||||||
|
mReply["errMsg"] = ""
|
||||||
|
msg["sendID"] = in.SendID
|
||||||
|
msg["recvID"] = in.RecvID
|
||||||
|
msg["msgFrom"] = in.MsgFrom
|
||||||
|
msg["contentType"] = in.ContentType
|
||||||
|
msg["sessionType"] = in.SessionType
|
||||||
|
msg["serverMsgID"] = in.ServerMsgID
|
||||||
|
msg["content"] = in.Content
|
||||||
|
msg["seq"] = in.RecvSeq
|
||||||
|
msg["sendTime"] = in.SendTime
|
||||||
|
msg["isEmphasize"] = in.IsEmphasize
|
||||||
|
msg["senderPlatformID"] = in.PlatformID
|
||||||
|
mReply["data"] = msg
|
||||||
|
bMsg, _ := json.Marshal(mReply)
|
||||||
|
switch in.GetContentType() {
|
||||||
|
case constant.SyncSenderMsg:
|
||||||
|
log.InfoByKv("come sync", in.OperationID, "args", in.String())
|
||||||
|
RecvID = in.GetSendID()
|
||||||
|
for key, conn := range ws.wsUserToConn {
|
||||||
|
UIDAndPID := strings.Split(key, " ")
|
||||||
|
if UIDAndPID[0] == RecvID && utils.PlatformIDToName(in.GetPlatformID()) != UIDAndPID[1] {
|
||||||
|
resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
|
||||||
|
temp := &pbRelay.SingleMsgToUser{
|
||||||
|
ResultCode: resultCode,
|
||||||
|
RecvID: UIDAndPID[0],
|
||||||
|
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
||||||
|
}
|
||||||
|
resp = append(resp, temp)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.InfoByKv("not come sync", in.OperationID, "args", in.String())
|
||||||
|
switch in.SessionType {
|
||||||
|
case constant.SingleChatType:
|
||||||
|
log.InfoByKv("come single", in.OperationID, "args", in.String())
|
||||||
|
RecvID = in.GetRecvID()
|
||||||
|
case constant.GroupChatType:
|
||||||
|
RecvID = strings.Split(in.GetRecvID(), " ")[0]
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
log.InfoByKv("come for range", in.OperationID, "args", in.String())
|
||||||
|
|
||||||
|
for key, conn := range ws.wsUserToConn {
|
||||||
|
UIDAndPID := strings.Split(key, " ")
|
||||||
|
if UIDAndPID[0] == RecvID {
|
||||||
|
resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
|
||||||
|
temp := &pbRelay.SingleMsgToUser{
|
||||||
|
ResultCode: resultCode,
|
||||||
|
RecvID: UIDAndPID[0],
|
||||||
|
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
||||||
|
}
|
||||||
|
resp = append(resp, temp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &pbRelay.MsgToUserResp{
|
||||||
|
Resp: resp,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//func (r *RPCServer) SendMsgByWS(_ context.Context, in *pbRelay.SendMsgByWSReq) (*pbRelay.MsgToUserResp, error) {
|
||||||
|
// log.InfoByKv("SendMsgByWS is arriving ", in.OperationID, "args", in.String())
|
||||||
|
// resp := new(pbRelay.MsgToUserResp)
|
||||||
|
// MsgId := ws.genMsgNum()
|
||||||
|
// pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||||
|
// pbData.SendID = in.SendID
|
||||||
|
// pbData.RecvID = in.RecvID
|
||||||
|
// pbData.MsgID = MsgId
|
||||||
|
// pbData.SessionType = in.SessionType
|
||||||
|
// pbData.MsgFrom = in.MsgFrom
|
||||||
|
// pbData.Content = in.Content
|
||||||
|
// pbData.ContentType = in.ContentType
|
||||||
|
// pbData.OperationID = in.OperationID
|
||||||
|
// pbData.SendTime = in.SendTime
|
||||||
|
// pbData.PlatformID = in.PlatformID
|
||||||
|
// pKafka.writeMsg(&pbData)
|
||||||
|
// return resp, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
|
||||||
|
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
|
||||||
|
"error", err, "senderPlatform", utils.PlatformIDToName(in.PlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
|
||||||
|
ResultCode = -2
|
||||||
|
return ResultCode
|
||||||
|
} else {
|
||||||
|
log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String())
|
||||||
|
ResultCode = 0
|
||||||
|
return ResultCode
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
60
src/msg_gateway/gate/validate.go
Normal file
60
src/msg_gateway/gate/validate.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
** description("").
|
||||||
|
** copyright('Open_IM,www.Open_IM.io').
|
||||||
|
** author("fg,Gordon@tuoyun.net").
|
||||||
|
** time(2021/5/21 15:29).
|
||||||
|
*/
|
||||||
|
package gate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/constant"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
"github.com/mitchellh/mapstructure"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Req struct {
|
||||||
|
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
|
||||||
|
Token string `json:"token" validate:"required"`
|
||||||
|
SendID string `json:"sendID" validate:"required"`
|
||||||
|
OperationID string `json:"operationID" validate:"required"`
|
||||||
|
MsgIncr int32 `json:"msgIncr" validate:"required"`
|
||||||
|
Data map[string]interface{} `json:"data"`
|
||||||
|
}
|
||||||
|
type SeqData struct {
|
||||||
|
SeqBegin int64 `mapstructure:"seqBegin" validate:"required"`
|
||||||
|
SeqEnd int64 `mapstructure:"seqEnd" validate:"required"`
|
||||||
|
}
|
||||||
|
type MsgData struct {
|
||||||
|
PlatformID int32 `mapstructure:"platformID" validate:"required"`
|
||||||
|
SessionType int32 `mapstructure:"sessionType" validate:"required"`
|
||||||
|
MsgFrom int32 `mapstructure:"msgFrom" validate:"required"`
|
||||||
|
ContentType int32 `mapstructure:"contentType" validate:"required"`
|
||||||
|
RecvID string `mapstructure:"recvID" validate:"required"`
|
||||||
|
ForceList []string `mapstructure:"forceList" validate:"required"`
|
||||||
|
Content string `mapstructure:"content" validate:"required"`
|
||||||
|
Options map[string]interface{} `mapstructure:"options" validate:"required"`
|
||||||
|
ClientMsgID string `mapstructure:"clientMsgID" validate:"required"`
|
||||||
|
OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"`
|
||||||
|
Ext map[string]interface{} `mapstructure:"ext"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) {
|
||||||
|
switch r {
|
||||||
|
case constant.WSPullMsg:
|
||||||
|
data = SeqData{}
|
||||||
|
case constant.WSSendMsg:
|
||||||
|
data = MsgData{}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if err := mapstructure.WeakDecode(m.Data, &data); err != nil {
|
||||||
|
log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r)
|
||||||
|
return false, 203, err.Error(), nil
|
||||||
|
} else if err := validate.Struct(data); err != nil {
|
||||||
|
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
|
||||||
|
return false, 204, err.Error(), nil
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return true, 0, "", data
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
137
src/msg_gateway/gate/ws_server.go
Normal file
137
src/msg_gateway/gate/ws_server.go
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
package gate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WServer struct {
|
||||||
|
wsAddr string
|
||||||
|
wsMaxConnNum int
|
||||||
|
wsUpGrader *websocket.Upgrader
|
||||||
|
wsConnToUser map[*websocket.Conn]string
|
||||||
|
wsUserToConn map[string]*websocket.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) onInit(wsPort int) {
|
||||||
|
ip := utils.ServerIP
|
||||||
|
ws.wsAddr = ip + ":" + utils.IntToString(wsPort)
|
||||||
|
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
|
||||||
|
ws.wsConnToUser = make(map[*websocket.Conn]string)
|
||||||
|
ws.wsUserToConn = make(map[string]*websocket.Conn)
|
||||||
|
ws.wsUpGrader = &websocket.Upgrader{
|
||||||
|
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
|
||||||
|
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
|
||||||
|
CheckOrigin: func(r *http.Request) bool { return true },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) run() {
|
||||||
|
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
|
||||||
|
err := http.ListenAndServe(ws.wsAddr, nil) //Start listening
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("Ws listening err", "", "err", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if ws.headerCheck(w, r) {
|
||||||
|
query := r.URL.Query()
|
||||||
|
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("upgrade http conn err", "", "err", err)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
//Connection mapping relationship,
|
||||||
|
//userID+" "+platformID->conn
|
||||||
|
SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
|
||||||
|
ws.addUserConn(SendID, conn)
|
||||||
|
go ws.readMsg(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) readMsg(conn *websocket.Conn) {
|
||||||
|
for {
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err)
|
||||||
|
ws.delUserConn(conn)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ws.msgParse(conn, msg)
|
||||||
|
//ws.writeMsg(conn, 1, chat)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error {
|
||||||
|
rwLock.Lock()
|
||||||
|
defer rwLock.Unlock()
|
||||||
|
return conn.WriteMessage(a, msg)
|
||||||
|
|
||||||
|
}
|
||||||
|
func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) {
|
||||||
|
rwLock.Lock()
|
||||||
|
defer rwLock.Unlock()
|
||||||
|
ws.wsConnToUser[conn] = uid
|
||||||
|
ws.wsUserToConn[uid] = conn
|
||||||
|
log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) delUserConn(conn *websocket.Conn) {
|
||||||
|
rwLock.Lock()
|
||||||
|
defer rwLock.Unlock()
|
||||||
|
if uid, ok := ws.wsConnToUser[conn]; ok {
|
||||||
|
if _, ok = ws.wsUserToConn[uid]; ok {
|
||||||
|
delete(ws.wsUserToConn, uid)
|
||||||
|
log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "uid", uid)
|
||||||
|
}
|
||||||
|
delete(ws.wsConnToUser, conn)
|
||||||
|
}
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WServer) getUserConn(uid string) *websocket.Conn {
|
||||||
|
rwLock.RLock()
|
||||||
|
defer rwLock.RUnlock()
|
||||||
|
if conn, ok := ws.wsUserToConn[uid]; ok {
|
||||||
|
return conn
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (ws *WServer) getUserUid(conn *websocket.Conn) string {
|
||||||
|
rwLock.RLock()
|
||||||
|
defer rwLock.RUnlock()
|
||||||
|
|
||||||
|
if uid, ok := ws.wsConnToUser[conn]; ok {
|
||||||
|
return uid
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool {
|
||||||
|
status := http.StatusUnauthorized
|
||||||
|
query := r.URL.Query()
|
||||||
|
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
|
||||||
|
if !utils.VerifyToken(query["token"][0], query["sendID"][0]) {
|
||||||
|
log.ErrorByKv("Token verify failed", "", "query", query)
|
||||||
|
w.Header().Set("Sec-Websocket-Version", "13")
|
||||||
|
http.Error(w, http.StatusText(status), status)
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
log.InfoByKv("Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0])
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.ErrorByKv("Args err", "", "query", query)
|
||||||
|
w.Header().Set("Sec-Websocket-Version", "13")
|
||||||
|
http.Error(w, http.StatusText(status), status)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
18
src/msg_gateway/open_im_msg_gateway.go
Normal file
18
src/msg_gateway/open_im_msg_gateway.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/msg_gateway/gate"
|
||||||
|
"flag"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
rpcPort := flag.Int("rpc_port", 10500, "rpc listening port")
|
||||||
|
wsPort := flag.Int("ws_port", 10800, "rpc listening port")
|
||||||
|
flag.Parse()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
gate.Init(*rpcPort, *wsPort)
|
||||||
|
gate.Run()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
26
src/msg_transfer/Makefile
Normal file
26
src/msg_transfer/Makefile
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
.PHONY: all build run gotool install clean help
|
||||||
|
|
||||||
|
BINARY_NAME=open_im_msg_transfer
|
||||||
|
BIN_DIR=../../bin/
|
||||||
|
LAN_FILE=.go
|
||||||
|
GO_FILE:=${BINARY_NAME}${LAN_FILE}
|
||||||
|
|
||||||
|
all: gotool build
|
||||||
|
|
||||||
|
build:
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE}
|
||||||
|
|
||||||
|
run:
|
||||||
|
@go run ./
|
||||||
|
|
||||||
|
gotool:
|
||||||
|
go fmt ./
|
||||||
|
go vet ./
|
||||||
|
|
||||||
|
install:
|
||||||
|
make build
|
||||||
|
mv ${BINARY_NAME} ${BIN_DIR}
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
|
||||||
|
|
20
src/msg_transfer/logic/db.go
Normal file
20
src/msg_transfer/logic/db.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/db"
|
||||||
|
"Open_IM/src/common/db/mysql_model/im_mysql_model"
|
||||||
|
pbMsg "Open_IM/src/proto/chat"
|
||||||
|
)
|
||||||
|
|
||||||
|
func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error {
|
||||||
|
seq, err := db.DB.IncrUserSeq(uid)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pbMsg.RecvSeq = seq
|
||||||
|
return db.DB.SaveUserChat(uid, pbMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getGroupList(groupID string) ([]string, error) {
|
||||||
|
return im_mysql_model.SelectGroupList(groupID)
|
||||||
|
}
|
178
src/msg_transfer/logic/history_msg_handler.go
Normal file
178
src/msg_transfer/logic/history_msg_handler.go
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/constant"
|
||||||
|
kfk "Open_IM/src/common/kafka"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbMsg "Open_IM/src/proto/chat"
|
||||||
|
pb "Open_IM/src/proto/group"
|
||||||
|
pbPush "Open_IM/src/proto/push"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"context"
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fcb func(msg []byte, msgKey string)
|
||||||
|
|
||||||
|
type HistoryConsumerHandler struct {
|
||||||
|
msgHandle map[string]fcb
|
||||||
|
historyConsumerGroup *kfk.MConsumerGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *HistoryConsumerHandler) Init() {
|
||||||
|
mc.msgHandle = make(map[string]fcb)
|
||||||
|
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
||||||
|
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||||
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||||
|
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
|
||||||
|
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
|
||||||
|
pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||||
|
err := proto.Unmarshal(msg, &pbData)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pbSaveData := pbMsg.MsgSvrToPushSvrChatMsg{}
|
||||||
|
pbSaveData.SendID = pbData.SendID
|
||||||
|
pbSaveData.SendTime = pbData.SendTime
|
||||||
|
pbSaveData.Content = pbData.Content
|
||||||
|
pbSaveData.MsgFrom = pbData.MsgFrom
|
||||||
|
pbSaveData.ContentType = pbData.ContentType
|
||||||
|
pbSaveData.SessionType = pbData.SessionType
|
||||||
|
pbSaveData.MsgID = pbData.MsgID
|
||||||
|
pbSaveData.RecvID = pbData.RecvID
|
||||||
|
pbSaveData.PlatformID = pbData.PlatformID
|
||||||
|
Options := utils.JsonStringToMap(pbData.Options)
|
||||||
|
//Control whether to store offline messages (mongo)
|
||||||
|
isHistory := utils.GetSwitchFromOptions(Options, "history")
|
||||||
|
//Control whether to store history messages (mysql)
|
||||||
|
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
|
||||||
|
//Control whether to push message to sender's other terminal
|
||||||
|
isSenderSync := utils.GetSwitchFromOptions(Options, "senderSync")
|
||||||
|
if pbData.SessionType == constant.SingleChatType {
|
||||||
|
log.Info("", "", "msg_transfer chat type = SingleChatType", isHistory, isPersist, isSenderSync)
|
||||||
|
if isHistory {
|
||||||
|
if msgKey == pbSaveData.RecvID {
|
||||||
|
err := saveUserChat(pbData.RecvID, &pbSaveData)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
|
||||||
|
}
|
||||||
|
pbSaveData.Options = pbData.Options
|
||||||
|
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
||||||
|
sendMessageToPush(&pbSaveData)
|
||||||
|
} else if msgKey == pbSaveData.SendID {
|
||||||
|
err := saveUserChat(pbData.SendID, &pbSaveData)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
|
||||||
|
}
|
||||||
|
if isSenderSync {
|
||||||
|
pbSaveData.ContentType = constant.SyncSenderMsg
|
||||||
|
log.WarnByKv("SyncSenderMsg come here", pbData.OperationID, pbSaveData.String())
|
||||||
|
sendMessageToPush(&pbSaveData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
} else if pbData.SessionType == constant.GroupChatType {
|
||||||
|
log.Info("", "", "msg_transfer chat type = GroupChatType")
|
||||||
|
|
||||||
|
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
|
||||||
|
client := pb.NewGroupClient(etcdConn)
|
||||||
|
req := &pb.GetGroupInfoReq{
|
||||||
|
GroupID: pbSaveData.RecvID,
|
||||||
|
Token: pbData.Token,
|
||||||
|
OperationID: pbSaveData.OperationID,
|
||||||
|
}
|
||||||
|
log.Info("", "", "msg_transfer call group rpc, data = %s", req.String())
|
||||||
|
reply, err := client.GetGroupInfo(context.Background(), req)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("", "", "msg_transfer client.GetGroupInfo fail, err = %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, v := range reply.GroupMemberList {
|
||||||
|
//Store RecvID is userID+" "+groupID when chatType is Group
|
||||||
|
pbSaveData.RecvID = v.UserID + " " + pbSaveData.RecvID
|
||||||
|
if isHistory {
|
||||||
|
saveUserChat(v.UserID, &pbSaveData)
|
||||||
|
}
|
||||||
|
pbSaveData.Options = pbData.Options
|
||||||
|
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
||||||
|
if v.UserID != pbSaveData.SendID {
|
||||||
|
if utils.IsContain(v.UserID, pbData.ForceList) {
|
||||||
|
pbSaveData.IsEmphasize = true
|
||||||
|
}
|
||||||
|
sendMessageToPush(&pbSaveData)
|
||||||
|
} else {
|
||||||
|
if isSenderSync {
|
||||||
|
pbSaveData.ContentType = constant.SyncSenderMsg
|
||||||
|
sendMessageToPush(&pbSaveData)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Error("", "", "msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.InfoByKv("msg_transfer handle topic success...", "", "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||||
|
claim sarama.ConsumerGroupClaim) error {
|
||||||
|
for msg := range claim.Messages() {
|
||||||
|
log.InfoByKv("kafka get info to mongo", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value))
|
||||||
|
mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) {
|
||||||
|
msg := pbPush.PushMsgReq{}
|
||||||
|
msg.OperationID = message.OperationID
|
||||||
|
msg.PlatformID = message.PlatformID
|
||||||
|
msg.Content = message.Content
|
||||||
|
msg.ContentType = message.ContentType
|
||||||
|
msg.SessionType = message.SessionType
|
||||||
|
msg.RecvID = message.RecvID
|
||||||
|
msg.SendID = message.SendID
|
||||||
|
msg.IsEmphasize = message.IsEmphasize
|
||||||
|
msg.MsgFrom = message.MsgFrom
|
||||||
|
msg.Options = message.Options
|
||||||
|
msg.RecvSeq = message.RecvSeq
|
||||||
|
msg.SendTime = message.SendTime
|
||||||
|
msg.MsgID = message.MsgID
|
||||||
|
msg.OfflineInfo = message.OfflineInfo
|
||||||
|
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
|
||||||
|
if grpcConn == nil {
|
||||||
|
log.ErrorByKv("rpc dial failed", msg.OperationID, "push data", msg.String())
|
||||||
|
pid, offset, err := producer.SendMessage(message)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
|
||||||
|
_, err := msgClient.PushMsg(context.Background(), &msg)
|
||||||
|
defer grpcConn.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("rpc send failed", msg.OperationID, "push data", msg.String(), "err", err.Error())
|
||||||
|
pid, offset, err := producer.SendMessage(message)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.InfoByKv("rpc send success", msg.OperationID, "push data", msg.String())
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
25
src/msg_transfer/logic/init.go
Normal file
25
src/msg_transfer/logic/init.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/kafka"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
persistentCH PersistentConsumerHandler
|
||||||
|
historyCH HistoryConsumerHandler
|
||||||
|
producer *kafka.Producer
|
||||||
|
)
|
||||||
|
|
||||||
|
func Init() {
|
||||||
|
log.NewPrivateLog(config.Config.ModuleName.MsgTransferName)
|
||||||
|
persistentCH.Init()
|
||||||
|
historyCH.Init()
|
||||||
|
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
|
||||||
|
}
|
||||||
|
func Run() {
|
||||||
|
//register mysqlConsumerHandler to
|
||||||
|
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
||||||
|
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
||||||
|
}
|
63
src/msg_transfer/logic/persistent_msg_handler.go
Normal file
63
src/msg_transfer/logic/persistent_msg_handler.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
/*
|
||||||
|
** description("").
|
||||||
|
** copyright('tuoyun,www.tuoyun.net').
|
||||||
|
** author("fg,Gordon@tuoyun.net").
|
||||||
|
** time(2021/5/11 15:37).
|
||||||
|
*/
|
||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/db/mysql_model/im_mysql_msg_model"
|
||||||
|
kfk "Open_IM/src/common/kafka"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbMsg "Open_IM/src/proto/chat"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PersistentConsumerHandler struct {
|
||||||
|
msgHandle map[string]fcb
|
||||||
|
persistentConsumerGroup *kfk.MConsumerGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *PersistentConsumerHandler) Init() {
|
||||||
|
pc.msgHandle = make(map[string]fcb)
|
||||||
|
pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
|
||||||
|
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||||
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||||
|
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||||
|
|
||||||
|
}
|
||||||
|
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) {
|
||||||
|
log.InfoByKv("chat come here mysql!!!", "", "chat", string(msg))
|
||||||
|
pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||||
|
err := proto.Unmarshal(msg, &pbData)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
Options := utils.JsonStringToMap(pbData.Options)
|
||||||
|
//Control whether to store history messages (mysql)
|
||||||
|
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
|
||||||
|
//Only process receiver data
|
||||||
|
if isPersist && msgKey == pbData.RecvID {
|
||||||
|
log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
|
||||||
|
if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
|
||||||
|
log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||||
|
claim sarama.ConsumerGroupClaim) error {
|
||||||
|
for msg := range claim.Messages() {
|
||||||
|
log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value))
|
||||||
|
pc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
14
src/msg_transfer/open_im_msg_transfer.go
Normal file
14
src/msg_transfer/open_im_msg_transfer.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/msg_transfer/logic"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
logic.Init()
|
||||||
|
logic.Run()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
26
src/rpc/auth/Makefile
Normal file
26
src/rpc/auth/Makefile
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
.PHONY: all build run gotool install clean help
|
||||||
|
|
||||||
|
BINARY_NAME=open_im_auth
|
||||||
|
BIN_DIR=../../../bin/
|
||||||
|
LAN_FILE=.go
|
||||||
|
GO_FILE:=${BINARY_NAME}${LAN_FILE}
|
||||||
|
|
||||||
|
all: gotool build
|
||||||
|
|
||||||
|
build:
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ${BINARY_NAME} ${GO_FILE}
|
||||||
|
|
||||||
|
run:
|
||||||
|
@go run ./
|
||||||
|
|
||||||
|
gotool:
|
||||||
|
go fmt ./
|
||||||
|
go vet ./
|
||||||
|
|
||||||
|
install:
|
||||||
|
make build
|
||||||
|
mv ${BINARY_NAME} ${BIN_DIR}
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
|
||||||
|
|
61
src/rpc/auth/auth/rpcAuth.go
Normal file
61
src/rpc/auth/auth/rpcAuth.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package rpcAuth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
log2 "Open_IM/src/common/log"
|
||||||
|
pbAuth "Open_IM/src/proto/auth"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rpcAuth struct {
|
||||||
|
rpcPort int
|
||||||
|
rpcRegisterName string
|
||||||
|
etcdSchema string
|
||||||
|
etcdAddr []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRpcAuthServer(port int) *rpcAuth {
|
||||||
|
return &rpcAuth{
|
||||||
|
rpcPort: port,
|
||||||
|
rpcRegisterName: config.Config.RpcRegisterName.RpcGetTokenName,
|
||||||
|
etcdSchema: config.Config.Etcd.EtcdSchema,
|
||||||
|
etcdAddr: config.Config.Etcd.EtcdAddr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rpc *rpcAuth) Run() {
|
||||||
|
log2.Info("", "", "rpc get_token init...")
|
||||||
|
|
||||||
|
address := utils.ServerIP + ":" + strconv.Itoa(rpc.rpcPort)
|
||||||
|
listener, err := net.Listen("tcp", address)
|
||||||
|
if err != nil {
|
||||||
|
log2.Error("", "", "listen network failed, err = %s, address = %s", err.Error(), address)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log2.Info("", "", "listen network success, address = %s", address)
|
||||||
|
|
||||||
|
//grpc server
|
||||||
|
srv := grpc.NewServer()
|
||||||
|
defer srv.GracefulStop()
|
||||||
|
|
||||||
|
//service registers with etcd
|
||||||
|
|
||||||
|
pbAuth.RegisterAuthServer(srv, rpc)
|
||||||
|
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName, 10)
|
||||||
|
if err != nil {
|
||||||
|
log2.Error("", "", "register rpc get_token to etcd failed, err = %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = srv.Serve(listener)
|
||||||
|
if err != nil {
|
||||||
|
log2.Info("", "", "rpc get_token fail, err = %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log2.Info("", "", "rpc get_token init success")
|
||||||
|
}
|
20
src/rpc/auth/auth/user_register.go
Normal file
20
src/rpc/auth/auth/user_register.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package rpcAuth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/db/mysql_model/im_mysql_model"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbAuth "Open_IM/src/proto/auth"
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (rpc *rpcAuth) UserRegister(_ context.Context, pb *pbAuth.UserRegisterReq) (*pbAuth.UserRegisterResp, error) {
|
||||||
|
log.Info("", "", "rpc user_register start, [data: %s]", pb.String())
|
||||||
|
|
||||||
|
if err := im_mysql_model.UserRegister(pb); err != nil {
|
||||||
|
log.Error("", "", "rpc user_register error, [data: %s] [err: %s]", pb.String(), err.Error())
|
||||||
|
return &pbAuth.UserRegisterResp{Success: false}, err
|
||||||
|
}
|
||||||
|
log.Info("", "", "rpc user_register success return")
|
||||||
|
|
||||||
|
return &pbAuth.UserRegisterResp{Success: true}, nil
|
||||||
|
}
|
29
src/rpc/auth/auth/user_token.go
Normal file
29
src/rpc/auth/auth/user_token.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package rpcAuth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/db/mysql_model/im_mysql_model"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbAuth "Open_IM/src/proto/auth"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (rpc *rpcAuth) UserToken(_ context.Context, pb *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
|
||||||
|
log.Info("", "", "rpc user_token call start..., [pbTokenReq: %s]", pb.String())
|
||||||
|
|
||||||
|
_, err := im_mysql_model.FindUserByUID(pb.UID)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("", "", "rpc user_token call..., im_mysql_model.AppServerFindFromUserByUserID fail [uid: %s] [err: %s]", pb.UID, err.Error())
|
||||||
|
return &pbAuth.UserTokenResp{ErrCode: 500, ErrMsg: err.Error()}, err
|
||||||
|
}
|
||||||
|
log.Info("", "", "rpc user_token call..., im_mysql_model.AppServerFindFromUserByUserID")
|
||||||
|
|
||||||
|
tokens, expTime, err := utils.CreateToken(pb.UID, "", pb.Platform)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("", "", "rpc user_token call..., utils.CreateToken fail [uid: %s] [err: %s]", pb.UID, err.Error())
|
||||||
|
return &pbAuth.UserTokenResp{ErrCode: 500, ErrMsg: err.Error()}, err
|
||||||
|
}
|
||||||
|
log.Info("", "", "rpc user_token success return, [uid: %s] [tokens: %s]", pb.UID, tokens)
|
||||||
|
|
||||||
|
return &pbAuth.UserTokenResp{Token: tokens, ExpiredTime: expTime}, nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user