From f686fbb4ee035b1026111634a1c13d4a1b37ccfa Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 26 May 2021 19:24:25 +0800 Subject: [PATCH] msg_gateway and msg_transfer modules --- src/msg_gateway/Makefile | 26 +++ src/msg_gateway/gate/init.go | 29 +++ src/msg_gateway/gate/logic.go | 197 ++++++++++++++++++ src/msg_gateway/gate/rpc_server.go | 155 ++++++++++++++ src/msg_gateway/gate/validate.go | 60 ++++++ src/msg_gateway/gate/ws_server.go | 137 ++++++++++++ src/msg_gateway/open_im_msg_gateway.go | 18 ++ src/msg_transfer/Makefile | 26 +++ src/msg_transfer/logic/db.go | 20 ++ src/msg_transfer/logic/history_msg_handler.go | 178 ++++++++++++++++ src/msg_transfer/logic/init.go | 25 +++ .../logic/persistent_msg_handler.go | 63 ++++++ src/msg_transfer/open_im_msg_transfer.go | 14 ++ 13 files changed, 948 insertions(+) create mode 100644 src/msg_gateway/Makefile create mode 100644 src/msg_gateway/gate/init.go create mode 100644 src/msg_gateway/gate/logic.go create mode 100644 src/msg_gateway/gate/rpc_server.go create mode 100644 src/msg_gateway/gate/validate.go create mode 100644 src/msg_gateway/gate/ws_server.go create mode 100644 src/msg_gateway/open_im_msg_gateway.go create mode 100644 src/msg_transfer/Makefile create mode 100644 src/msg_transfer/logic/db.go create mode 100644 src/msg_transfer/logic/history_msg_handler.go create mode 100644 src/msg_transfer/logic/init.go create mode 100644 src/msg_transfer/logic/persistent_msg_handler.go create mode 100644 src/msg_transfer/open_im_msg_transfer.go diff --git a/src/msg_gateway/Makefile b/src/msg_gateway/Makefile new file mode 100644 index 000000000..1b6e67e9c --- /dev/null +++ b/src/msg_gateway/Makefile @@ -0,0 +1,26 @@ +.PHONY: all build run gotool install clean help + +BINARY_NAME=open_im_msg_gateway +BIN_DIR=../../bin/ +LAN_FILE=.go +GO_FILE:=${BINARY_NAME}${LAN_FILE} + +all: gotool build + +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE} + +run: + @go run ./ + +gotool: + go fmt ./ + go vet ./ + +install: + make build + mv ${BINARY_NAME} ${BIN_DIR} + +clean: + @if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi + diff --git a/src/msg_gateway/gate/init.go b/src/msg_gateway/gate/init.go new file mode 100644 index 000000000..510cdaefc --- /dev/null +++ b/src/msg_gateway/gate/init.go @@ -0,0 +1,29 @@ +package gate + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + "github.com/go-playground/validator/v10" + "sync" +) + +var ( + rwLock *sync.RWMutex + validate *validator.Validate + ws WServer + rpcSvr RPCServer +) + +func Init(rpcPort, wsPort int) { + //log initialization + log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName) + rwLock = new(sync.RWMutex) + validate = validator.New() + ws.onInit(wsPort) + rpcSvr.onInit(rpcPort) +} + +func Run() { + go ws.run() + go rpcSvr.run() +} diff --git a/src/msg_gateway/gate/logic.go b/src/msg_gateway/gate/logic.go new file mode 100644 index 000000000..770592b2a --- /dev/null +++ b/src/msg_gateway/gate/logic.go @@ -0,0 +1,197 @@ +package gate + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/constant" + "Open_IM/src/common/log" + pbChat "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "context" + "encoding/json" + "github.com/gorilla/websocket" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "strings" +) + +func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) { + //ws online debug data + //{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0} + //{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6} + //{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b", + //"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID": + //"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"} + m := Req{} + if err := json.Unmarshal(jsonMsg, &m); err != nil { + log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error()) + ws.sendErrMsg(conn, 200, err.Error()) + return + } + if err := validate.Struct(m); err != nil { + log.ErrorByKv("ws args validate err", "", "err", err.Error()) + ws.sendErrMsg(conn, 201, err.Error()) + return + } + + if !utils.VerifyToken(m.Token, m.SendID) { + ws.sendErrMsg(conn, 202, "token validate err") + return + } + log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID) + + switch m.ReqIdentifier { + case constant.WSGetNewestSeq: + ws.newestSeqReq(conn, &m) + case constant.WSPullMsg: + ws.pullMsgReq(conn, &m) + case constant.WSSendMsg: + ws.sendMsgReq(conn, &m) + default: + } +} +func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNewSeqResp) { + mReply := make(map[string]interface{}) + mData := make(map[string]interface{}) + mReply["reqIdentifier"] = m.ReqIdentifier + mReply["msgIncr"] = m.MsgIncr + mReply["errCode"] = pb.GetErrCode() + mReply["errMsg"] = pb.GetErrMsg() + mData["seq"] = pb.GetSeq() + mReply["data"] = mData + ws.sendMsg(conn, mReply) +} +func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) { + log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m) + pbData := pbChat.GetNewSeqReq{} + pbData.UserID = m.SendID + pbData.OperationID = m.OperationID + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + if grpcConn == nil { + log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m) + } + msgClient := pbChat.NewChatClient(grpcConn) + reply, err := msgClient.GetNewSeq(context.Background(), &pbData) + if err != nil { + log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String()) + return + } + log.InfoByKv("rpc call success to getNewSeq", pbData.OperationID, "replyData", reply.String()) + ws.newestSeqResp(conn, m, reply) + +} + +func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMessageResp) { + mReply := make(map[string]interface{}) + msg := make(map[string]interface{}) + mReply["reqIdentifier"] = m.ReqIdentifier + mReply["msgIncr"] = m.MsgIncr + mReply["errCode"] = pb.GetErrCode() + mReply["errMsg"] = pb.GetErrMsg() + //空切片 + if v := pb.GetSingleUserMsg(); v != nil { + msg["single"] = v + } else { + msg["single"] = []pbChat.GatherFormat{} + } + if v := pb.GetGroupUserMsg(); v != nil { + msg["group"] = v + } else { + msg["group"] = []pbChat.GatherFormat{} + } + msg["maxSeq"] = pb.GetMaxSeq() + msg["minSeq"] = pb.GetMinSeq() + mReply["data"] = msg + ws.sendMsg(conn, mReply) + +} + +func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) { + log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m) + reply := new(pbChat.PullMessageResp) + isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg) + if isPass { + pbData := pbChat.PullMessageReq{} + pbData.UserID = m.SendID + pbData.OperationID = m.OperationID + pbData.SeqBegin = data.(SeqData).SeqBegin + pbData.SeqEnd = data.(SeqData).SeqEnd + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + reply, err := msgClient.PullMessage(context.Background(), &pbData) + if err != nil { + log.ErrorByKv("PullMessage error", pbData.OperationID, "err", err.Error()) + return + } + log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(), + "MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg())) + ws.pullMsgResp(conn, m, reply) + } else { + reply.ErrCode = errCode + reply.ErrMsg = errMsg + ws.pullMsgResp(conn, m, reply) + } +} + +func (ws *WServer) sendMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.UserSendMsgResp) { + mReply := make(map[string]interface{}) + mReplyData := make(map[string]interface{}) + mReply["reqIdentifier"] = m.ReqIdentifier + mReply["msgIncr"] = m.MsgIncr + mReply["errCode"] = pb.GetErrCode() + mReply["errMsg"] = pb.GetErrMsg() + mReplyData["clientMsgID"] = pb.GetClientMsgID() + mReplyData["serverMsgID"] = pb.GetServerMsgID() + mReply["data"] = mReplyData + ws.sendMsg(conn, mReply) +} + +func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) { + log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) + reply := new(pbChat.UserSendMsgResp) + isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) + if isPass { + data := pData.(MsgData) + pbData := pbChat.UserSendMsgReq{ + ReqIdentifier: m.ReqIdentifier, + Token: m.Token, + SendID: m.SendID, + OperationID: m.OperationID, + MsgIncr: m.MsgIncr, + PlatformID: data.PlatformID, + SessionType: data.SessionType, + MsgFrom: data.MsgFrom, + ContentType: data.ContentType, + RecvID: data.RecvID, + ForceList: data.ForceList, + Content: data.Content, + Options: utils.MapToJsonString(data.Options), + ClientMsgID: data.ClientMsgID, + OffLineInfo: utils.MapToJsonString(data.OfflineInfo), + } + log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + client := pbChat.NewChatClient(etcdConn) + log.Info("", "", "api UserSendMsg call, api call rpc...") + reply, _ := client.UserSendMsg(context.Background(), &pbData) + log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String()) + ws.sendMsgResp(conn, m, reply) + } else { + reply.ErrCode = errCode + reply.ErrMsg = errMsg + ws.sendMsgResp(conn, m, reply) + } + +} + +func (ws *WServer) sendMsg(conn *websocket.Conn, mReply map[string]interface{}) { + bMsg, _ := json.Marshal(mReply) + err := ws.writeMsg(conn, websocket.TextMessage, bMsg) + if err != nil { + log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply) + } +} +func (ws *WServer) sendErrMsg(conn *websocket.Conn, errCode int32, errMsg string) { + mReply := make(map[string]interface{}) + mReply["errCode"] = errCode + mReply["errMsg"] = errMsg + ws.sendMsg(conn, mReply) +} diff --git a/src/msg_gateway/gate/rpc_server.go b/src/msg_gateway/gate/rpc_server.go new file mode 100644 index 000000000..be80d080c --- /dev/null +++ b/src/msg_gateway/gate/rpc_server.go @@ -0,0 +1,155 @@ +package gate + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/constant" + "Open_IM/src/common/log" + pbRelay "Open_IM/src/proto/relay" + "Open_IM/src/utils" + "context" + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "google.golang.org/grpc" + "net" + "strings" +) + +type RPCServer struct { + rpcPort int + rpcRegisterName string + etcdSchema string + etcdAddr []string +} + +func (r *RPCServer) onInit(rpcPort int) { + r.rpcPort = rpcPort + r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName + r.etcdSchema = config.Config.Etcd.EtcdSchema + r.etcdAddr = config.Config.Etcd.EtcdAddr +} +func (r *RPCServer) run() { + ip := utils.ServerIP + registerAddress := ip + ":" + utils.IntToString(r.rpcPort) + listener, err := net.Listen("tcp", registerAddress) + if err != nil { + log.ErrorByArgs(fmt.Sprintf("fail to listening consumer, err:%v\n", err)) + return + } + defer listener.Close() + srv := grpc.NewServer() + defer srv.GracefulStop() + pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r) + err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10) + if err != nil { + log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error()) + } + err = srv.Serve(listener) + if err != nil { + log.ErrorByKv("push message rpc listening err", "", "err", err.Error()) + return + } +} +func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbRelay.MsgToUserResp, error) { + log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String()) + var resp []*pbRelay.SingleMsgToUser + var RecvID string + msg := make(map[string]interface{}) + mReply := make(map[string]interface{}) + mReply["reqIdentifier"] = constant.WSPushMsg + mReply["errCode"] = 0 + mReply["errMsg"] = "" + msg["sendID"] = in.SendID + msg["recvID"] = in.RecvID + msg["msgFrom"] = in.MsgFrom + msg["contentType"] = in.ContentType + msg["sessionType"] = in.SessionType + msg["serverMsgID"] = in.ServerMsgID + msg["content"] = in.Content + msg["seq"] = in.RecvSeq + msg["sendTime"] = in.SendTime + msg["isEmphasize"] = in.IsEmphasize + msg["senderPlatformID"] = in.PlatformID + mReply["data"] = msg + bMsg, _ := json.Marshal(mReply) + switch in.GetContentType() { + case constant.SyncSenderMsg: + log.InfoByKv("come sync", in.OperationID, "args", in.String()) + RecvID = in.GetSendID() + for key, conn := range ws.wsUserToConn { + UIDAndPID := strings.Split(key, " ") + if UIDAndPID[0] == RecvID && utils.PlatformIDToName(in.GetPlatformID()) != UIDAndPID[1] { + resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0]) + temp := &pbRelay.SingleMsgToUser{ + ResultCode: resultCode, + RecvID: UIDAndPID[0], + RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]), + } + resp = append(resp, temp) + } + + } + default: + log.InfoByKv("not come sync", in.OperationID, "args", in.String()) + switch in.SessionType { + case constant.SingleChatType: + log.InfoByKv("come single", in.OperationID, "args", in.String()) + RecvID = in.GetRecvID() + case constant.GroupChatType: + RecvID = strings.Split(in.GetRecvID(), " ")[0] + default: + } + log.InfoByKv("come for range", in.OperationID, "args", in.String()) + + for key, conn := range ws.wsUserToConn { + UIDAndPID := strings.Split(key, " ") + if UIDAndPID[0] == RecvID { + resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0]) + temp := &pbRelay.SingleMsgToUser{ + ResultCode: resultCode, + RecvID: UIDAndPID[0], + RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]), + } + resp = append(resp, temp) + } + } + } + return &pbRelay.MsgToUserResp{ + Resp: resp, + }, nil +} + +//func (r *RPCServer) SendMsgByWS(_ context.Context, in *pbRelay.SendMsgByWSReq) (*pbRelay.MsgToUserResp, error) { +// log.InfoByKv("SendMsgByWS is arriving ", in.OperationID, "args", in.String()) +// resp := new(pbRelay.MsgToUserResp) +// MsgId := ws.genMsgNum() +// pbData := pbMsg.WSToMsgSvrChatMsg{} +// pbData.SendID = in.SendID +// pbData.RecvID = in.RecvID +// pbData.MsgID = MsgId +// pbData.SessionType = in.SessionType +// pbData.MsgFrom = in.MsgFrom +// pbData.Content = in.Content +// pbData.ContentType = in.ContentType +// pbData.OperationID = in.OperationID +// pbData.SendTime = in.SendTime +// pbData.PlatformID = in.PlatformID +// pKafka.writeMsg(&pbData) +// return resp, nil +//} + +func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) { + err := ws.writeMsg(conn, websocket.TextMessage, bMsg) + if err != nil { + log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(), + "error", err, "senderPlatform", utils.PlatformIDToName(in.PlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID) + ResultCode = -2 + return ResultCode + } else { + log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String()) + ResultCode = 0 + return ResultCode + } + +} diff --git a/src/msg_gateway/gate/validate.go b/src/msg_gateway/gate/validate.go new file mode 100644 index 000000000..a7a75f8e8 --- /dev/null +++ b/src/msg_gateway/gate/validate.go @@ -0,0 +1,60 @@ +/* +** description(""). +** copyright('Open_IM,www.Open_IM.io'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/5/21 15:29). + */ +package gate + +import ( + "Open_IM/src/common/constant" + "Open_IM/src/common/log" + "github.com/mitchellh/mapstructure" +) + +type Req struct { + ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` + Token string `json:"token" validate:"required"` + SendID string `json:"sendID" validate:"required"` + OperationID string `json:"operationID" validate:"required"` + MsgIncr int32 `json:"msgIncr" validate:"required"` + Data map[string]interface{} `json:"data"` +} +type SeqData struct { + SeqBegin int64 `mapstructure:"seqBegin" validate:"required"` + SeqEnd int64 `mapstructure:"seqEnd" validate:"required"` +} +type MsgData struct { + PlatformID int32 `mapstructure:"platformID" validate:"required"` + SessionType int32 `mapstructure:"sessionType" validate:"required"` + MsgFrom int32 `mapstructure:"msgFrom" validate:"required"` + ContentType int32 `mapstructure:"contentType" validate:"required"` + RecvID string `mapstructure:"recvID" validate:"required"` + ForceList []string `mapstructure:"forceList" validate:"required"` + Content string `mapstructure:"content" validate:"required"` + Options map[string]interface{} `mapstructure:"options" validate:"required"` + ClientMsgID string `mapstructure:"clientMsgID" validate:"required"` + OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"` + Ext map[string]interface{} `mapstructure:"ext"` +} + +func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) { + switch r { + case constant.WSPullMsg: + data = SeqData{} + case constant.WSSendMsg: + data = MsgData{} + default: + } + if err := mapstructure.WeakDecode(m.Data, &data); err != nil { + log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r) + return false, 203, err.Error(), nil + } else if err := validate.Struct(data); err != nil { + log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r) + return false, 204, err.Error(), nil + + } else { + return true, 0, "", data + } + +} diff --git a/src/msg_gateway/gate/ws_server.go b/src/msg_gateway/gate/ws_server.go new file mode 100644 index 000000000..ef9d126d6 --- /dev/null +++ b/src/msg_gateway/gate/ws_server.go @@ -0,0 +1,137 @@ +package gate + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + "Open_IM/src/utils" + "github.com/gorilla/websocket" + "net/http" + "time" +) + +type WServer struct { + wsAddr string + wsMaxConnNum int + wsUpGrader *websocket.Upgrader + wsConnToUser map[*websocket.Conn]string + wsUserToConn map[string]*websocket.Conn +} + +func (ws *WServer) onInit(wsPort int) { + ip := utils.ServerIP + ws.wsAddr = ip + ":" + utils.IntToString(wsPort) + ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum + ws.wsConnToUser = make(map[*websocket.Conn]string) + ws.wsUserToConn = make(map[string]*websocket.Conn) + ws.wsUpGrader = &websocket.Upgrader{ + HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, + ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, + CheckOrigin: func(r *http.Request) bool { return true }, + } +} + +func (ws *WServer) run() { + http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler + err := http.ListenAndServe(ws.wsAddr, nil) //Start listening + if err != nil { + log.ErrorByKv("Ws listening err", "", "err", err.Error()) + } +} + +func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { + if ws.headerCheck(w, r) { + query := r.URL.Query() + conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator + if err != nil { + log.ErrorByKv("upgrade http conn err", "", "err", err) + return + } else { + //Connection mapping relationship, + //userID+" "+platformID->conn + SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0]))) + ws.addUserConn(SendID, conn) + go ws.readMsg(conn) + } + } +} + +func (ws *WServer) readMsg(conn *websocket.Conn) { + for { + _, msg, err := conn.ReadMessage() + if err != nil { + log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err) + ws.delUserConn(conn) + return + } + ws.msgParse(conn, msg) + //ws.writeMsg(conn, 1, chat) + } + +} + +func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error { + rwLock.Lock() + defer rwLock.Unlock() + return conn.WriteMessage(a, msg) + +} +func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) { + rwLock.Lock() + defer rwLock.Unlock() + ws.wsConnToUser[conn] = uid + ws.wsUserToConn[uid] = conn + log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid) + +} + +func (ws *WServer) delUserConn(conn *websocket.Conn) { + rwLock.Lock() + defer rwLock.Unlock() + if uid, ok := ws.wsConnToUser[conn]; ok { + if _, ok = ws.wsUserToConn[uid]; ok { + delete(ws.wsUserToConn, uid) + log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "uid", uid) + } + delete(ws.wsConnToUser, conn) + } + conn.Close() +} + +func (ws *WServer) getUserConn(uid string) *websocket.Conn { + rwLock.RLock() + defer rwLock.RUnlock() + if conn, ok := ws.wsUserToConn[uid]; ok { + return conn + } + return nil +} +func (ws *WServer) getUserUid(conn *websocket.Conn) string { + rwLock.RLock() + defer rwLock.RUnlock() + + if uid, ok := ws.wsConnToUser[conn]; ok { + return uid + } + return "" +} +func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool { + status := http.StatusUnauthorized + query := r.URL.Query() + if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 { + if !utils.VerifyToken(query["token"][0], query["sendID"][0]) { + log.ErrorByKv("Token verify failed", "", "query", query) + w.Header().Set("Sec-Websocket-Version", "13") + http.Error(w, http.StatusText(status), status) + return false + } else { + log.InfoByKv("Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0]) + return true + } + } else { + log.ErrorByKv("Args err", "", "query", query) + w.Header().Set("Sec-Websocket-Version", "13") + http.Error(w, http.StatusText(status), status) + return false + } + +} diff --git a/src/msg_gateway/open_im_msg_gateway.go b/src/msg_gateway/open_im_msg_gateway.go new file mode 100644 index 000000000..b125d0ce4 --- /dev/null +++ b/src/msg_gateway/open_im_msg_gateway.go @@ -0,0 +1,18 @@ +package main + +import ( + "Open_IM/src/msg_gateway/gate" + "flag" + "sync" +) + +func main() { + rpcPort := flag.Int("rpc_port", 10500, "rpc listening port") + wsPort := flag.Int("ws_port", 10800, "rpc listening port") + flag.Parse() + var wg sync.WaitGroup + wg.Add(1) + gate.Init(*rpcPort, *wsPort) + gate.Run() + wg.Wait() +} diff --git a/src/msg_transfer/Makefile b/src/msg_transfer/Makefile new file mode 100644 index 000000000..1813594d5 --- /dev/null +++ b/src/msg_transfer/Makefile @@ -0,0 +1,26 @@ +.PHONY: all build run gotool install clean help + +BINARY_NAME=open_im_msg_transfer +BIN_DIR=../../bin/ +LAN_FILE=.go +GO_FILE:=${BINARY_NAME}${LAN_FILE} + +all: gotool build + +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE} + +run: + @go run ./ + +gotool: + go fmt ./ + go vet ./ + +install: + make build + mv ${BINARY_NAME} ${BIN_DIR} + +clean: + @if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi + diff --git a/src/msg_transfer/logic/db.go b/src/msg_transfer/logic/db.go new file mode 100644 index 000000000..99aa11474 --- /dev/null +++ b/src/msg_transfer/logic/db.go @@ -0,0 +1,20 @@ +package logic + +import ( + "Open_IM/src/common/db" + "Open_IM/src/common/db/mysql_model/im_mysql_model" + pbMsg "Open_IM/src/proto/chat" +) + +func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error { + seq, err := db.DB.IncrUserSeq(uid) + if err != nil { + return err + } + pbMsg.RecvSeq = seq + return db.DB.SaveUserChat(uid, pbMsg) +} + +func getGroupList(groupID string) ([]string, error) { + return im_mysql_model.SelectGroupList(groupID) +} diff --git a/src/msg_transfer/logic/history_msg_handler.go b/src/msg_transfer/logic/history_msg_handler.go new file mode 100644 index 000000000..827b89481 --- /dev/null +++ b/src/msg_transfer/logic/history_msg_handler.go @@ -0,0 +1,178 @@ +package logic + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/constant" + kfk "Open_IM/src/common/kafka" + "Open_IM/src/common/log" + pbMsg "Open_IM/src/proto/chat" + pb "Open_IM/src/proto/group" + pbPush "Open_IM/src/proto/push" + "Open_IM/src/utils" + "context" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "strings" +) + +type fcb func(msg []byte, msgKey string) + +type HistoryConsumerHandler struct { + msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup +} + +func (mc *HistoryConsumerHandler) Init() { + mc.msgHandle = make(map[string]fcb) + mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo + mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, + config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + +} + +func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { + log.InfoByKv("chat come mongo!!!", "", "chat", string(msg)) + pbData := pbMsg.WSToMsgSvrChatMsg{} + err := proto.Unmarshal(msg, &pbData) + if err != nil { + log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error()) + return + } + pbSaveData := pbMsg.MsgSvrToPushSvrChatMsg{} + pbSaveData.SendID = pbData.SendID + pbSaveData.SendTime = pbData.SendTime + pbSaveData.Content = pbData.Content + pbSaveData.MsgFrom = pbData.MsgFrom + pbSaveData.ContentType = pbData.ContentType + pbSaveData.SessionType = pbData.SessionType + pbSaveData.MsgID = pbData.MsgID + pbSaveData.RecvID = pbData.RecvID + pbSaveData.PlatformID = pbData.PlatformID + Options := utils.JsonStringToMap(pbData.Options) + //Control whether to store offline messages (mongo) + isHistory := utils.GetSwitchFromOptions(Options, "history") + //Control whether to store history messages (mysql) + isPersist := utils.GetSwitchFromOptions(Options, "persistent") + //Control whether to push message to sender's other terminal + isSenderSync := utils.GetSwitchFromOptions(Options, "senderSync") + if pbData.SessionType == constant.SingleChatType { + log.Info("", "", "msg_transfer chat type = SingleChatType", isHistory, isPersist, isSenderSync) + if isHistory { + if msgKey == pbSaveData.RecvID { + err := saveUserChat(pbData.RecvID, &pbSaveData) + if err != nil { + log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error()) + } + pbSaveData.Options = pbData.Options + pbSaveData.OfflineInfo = pbData.OfflineInfo + sendMessageToPush(&pbSaveData) + } else if msgKey == pbSaveData.SendID { + err := saveUserChat(pbData.SendID, &pbSaveData) + if err != nil { + log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error()) + } + if isSenderSync { + pbSaveData.ContentType = constant.SyncSenderMsg + log.WarnByKv("SyncSenderMsg come here", pbData.OperationID, pbSaveData.String()) + sendMessageToPush(&pbSaveData) + } + } + + } + } else if pbData.SessionType == constant.GroupChatType { + log.Info("", "", "msg_transfer chat type = GroupChatType") + + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) + client := pb.NewGroupClient(etcdConn) + req := &pb.GetGroupInfoReq{ + GroupID: pbSaveData.RecvID, + Token: pbData.Token, + OperationID: pbSaveData.OperationID, + } + log.Info("", "", "msg_transfer call group rpc, data = %s", req.String()) + reply, err := client.GetGroupInfo(context.Background(), req) + if err != nil { + log.Error("", "", "msg_transfer client.GetGroupInfo fail, err = %s", err.Error()) + return + } + for _, v := range reply.GroupMemberList { + //Store RecvID is userID+" "+groupID when chatType is Group + pbSaveData.RecvID = v.UserID + " " + pbSaveData.RecvID + if isHistory { + saveUserChat(v.UserID, &pbSaveData) + } + pbSaveData.Options = pbData.Options + pbSaveData.OfflineInfo = pbData.OfflineInfo + if v.UserID != pbSaveData.SendID { + if utils.IsContain(v.UserID, pbData.ForceList) { + pbSaveData.IsEmphasize = true + } + sendMessageToPush(&pbSaveData) + } else { + if isSenderSync { + pbSaveData.ContentType = constant.SyncSenderMsg + sendMessageToPush(&pbSaveData) + } + + } + + } + + } else { + log.Error("", "", "msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType) + } + + log.InfoByKv("msg_transfer handle topic success...", "", "") +} + +func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + log.InfoByKv("kafka get info to mongo", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value)) + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + } + return nil +} +func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) { + msg := pbPush.PushMsgReq{} + msg.OperationID = message.OperationID + msg.PlatformID = message.PlatformID + msg.Content = message.Content + msg.ContentType = message.ContentType + msg.SessionType = message.SessionType + msg.RecvID = message.RecvID + msg.SendID = message.SendID + msg.IsEmphasize = message.IsEmphasize + msg.MsgFrom = message.MsgFrom + msg.Options = message.Options + msg.RecvSeq = message.RecvSeq + msg.SendTime = message.SendTime + msg.MsgID = message.MsgID + msg.OfflineInfo = message.OfflineInfo + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName) + if grpcConn == nil { + log.ErrorByKv("rpc dial failed", msg.OperationID, "push data", msg.String()) + pid, offset, err := producer.SendMessage(message) + if err != nil { + log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) + } + return + } + msgClient := pbPush.NewPushMsgServiceClient(grpcConn) + _, err := msgClient.PushMsg(context.Background(), &msg) + defer grpcConn.Close() + if err != nil { + log.ErrorByKv("rpc send failed", msg.OperationID, "push data", msg.String(), "err", err.Error()) + pid, offset, err := producer.SendMessage(message) + if err != nil { + log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) + } + } else { + log.InfoByKv("rpc send success", msg.OperationID, "push data", msg.String()) + + } +} diff --git a/src/msg_transfer/logic/init.go b/src/msg_transfer/logic/init.go new file mode 100644 index 000000000..03a7d2c33 --- /dev/null +++ b/src/msg_transfer/logic/init.go @@ -0,0 +1,25 @@ +package logic + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/kafka" + "Open_IM/src/common/log" +) + +var ( + persistentCH PersistentConsumerHandler + historyCH HistoryConsumerHandler + producer *kafka.Producer +) + +func Init() { + log.NewPrivateLog(config.Config.ModuleName.MsgTransferName) + persistentCH.Init() + historyCH.Init() + producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) +} +func Run() { + //register mysqlConsumerHandler to + go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) + go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) +} diff --git a/src/msg_transfer/logic/persistent_msg_handler.go b/src/msg_transfer/logic/persistent_msg_handler.go new file mode 100644 index 000000000..9a54571f4 --- /dev/null +++ b/src/msg_transfer/logic/persistent_msg_handler.go @@ -0,0 +1,63 @@ +/* +** description(""). +** copyright('tuoyun,www.tuoyun.net'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/5/11 15:37). + */ +package logic + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/db/mysql_model/im_mysql_msg_model" + kfk "Open_IM/src/common/kafka" + "Open_IM/src/common/log" + pbMsg "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" +) + +type PersistentConsumerHandler struct { + msgHandle map[string]fcb + persistentConsumerGroup *kfk.MConsumerGroup +} + +func (pc *PersistentConsumerHandler) Init() { + pc.msgHandle = make(map[string]fcb) + pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql + pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, + config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) + +} +func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) { + log.InfoByKv("chat come here mysql!!!", "", "chat", string(msg)) + pbData := pbMsg.WSToMsgSvrChatMsg{} + err := proto.Unmarshal(msg, &pbData) + if err != nil { + log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error()) + return + } + Options := utils.JsonStringToMap(pbData.Options) + //Control whether to store history messages (mysql) + isPersist := utils.GetSwitchFromOptions(Options, "persistent") + //Only process receiver data + if isPersist && msgKey == pbData.RecvID { + log.InfoByKv("msg_transfer chat persisting", pbData.OperationID) + if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil { + log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String()) + return + } + } + +} +func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value)) + pc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + } + return nil +} diff --git a/src/msg_transfer/open_im_msg_transfer.go b/src/msg_transfer/open_im_msg_transfer.go new file mode 100644 index 000000000..0a3775113 --- /dev/null +++ b/src/msg_transfer/open_im_msg_transfer.go @@ -0,0 +1,14 @@ +package main + +import ( + "Open_IM/src/msg_transfer/logic" + "sync" +) + +func main() { + var wg sync.WaitGroup + wg.Add(1) + logic.Init() + logic.Run() + wg.Wait() +}