mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 10:22:36 +08:00
rpc chat
This commit is contained in:
parent
ff0e398a55
commit
26ea2cdc3d
13
src/rpc/auth/open_im_auth.go
Normal file
13
src/rpc/auth/open_im_auth.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
rpcAuth "Open_IM/src/rpc/auth/auth"
|
||||||
|
"flag"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
rpcPort := flag.Int("port", 10600, "RpcToken default listen port 10800")
|
||||||
|
flag.Parse()
|
||||||
|
rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort)
|
||||||
|
rpcServer.Run()
|
||||||
|
}
|
25
src/rpc/chat/Makefile
Normal file
25
src/rpc/chat/Makefile
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
.PHONY: all build run gotool install clean help
|
||||||
|
|
||||||
|
BINARY_NAME=open_im_msg
|
||||||
|
BIN_DIR=../../../bin/
|
||||||
|
LAN_FILE=.go
|
||||||
|
GO_FILE:=${BINARY_NAME}${LAN_FILE}
|
||||||
|
|
||||||
|
all: gotool build
|
||||||
|
|
||||||
|
build:
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE}
|
||||||
|
|
||||||
|
run:
|
||||||
|
@go run ./
|
||||||
|
|
||||||
|
gotool:
|
||||||
|
go fmt ./
|
||||||
|
go vet ./
|
||||||
|
|
||||||
|
install:
|
||||||
|
make build
|
||||||
|
mv ${BINARY_NAME} ${BIN_DIR}
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
|
159
src/rpc/chat/chat/pull_message.go
Normal file
159
src/rpc/chat/chat/pull_message.go
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
//实现pb定义的rpc服务
|
||||||
|
package rpcChat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
commonDB "Open_IM/src/common/db"
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
pbMsg "Open_IM/src/proto/chat"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) {
|
||||||
|
log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String())
|
||||||
|
//seq, err := model.GetBiggestSeqFromReceive(in.UserID)
|
||||||
|
seq, err := commonDB.DB.GetUserSeq(in.UserID)
|
||||||
|
resp := new(pbMsg.GetNewSeqResp)
|
||||||
|
if err == nil {
|
||||||
|
resp.Seq = seq
|
||||||
|
resp.ErrCode = 0
|
||||||
|
resp.ErrMsg = ""
|
||||||
|
return resp, err
|
||||||
|
} else {
|
||||||
|
log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error())
|
||||||
|
resp.Seq = 0
|
||||||
|
resp.ErrCode = 0
|
||||||
|
resp.ErrMsg = ""
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//func (s *MsgServer) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) {
|
||||||
|
// log.InfoByArgs(fmt.Sprintf("rpc pullMessage is arriving,args=%s", in.String()))
|
||||||
|
// resp := new(pbMsg.PullMessageResp)
|
||||||
|
// var respMsgFormat []*pbMsg.MsgFormat
|
||||||
|
// var respUserMsgFormat []*pbMsg.UserMsgFormat
|
||||||
|
// conn := db.NewDbConnection()
|
||||||
|
// rows, err := conn.Table("receive r").Select("c.sender_id,c.receiver_id,"+
|
||||||
|
// "c.msg_type,c.push_msg_type,c.chat_type,c.msg_id,c.send_content,r.seq,c.send_time,c.sender_nickname,c.receiver_nickname,c.sender_head_url,c.receiver_head_url").
|
||||||
|
// Joins("INNER JOIN chat_log c ON r.msg_id = c.msg_id AND r.user_id = ? AND seq BETWEEN ? AND ?",
|
||||||
|
// in.UserID, in.SeqBegin, in.SeqEnd).Rows()
|
||||||
|
// if err != nil {
|
||||||
|
// fmt.Printf("pullMsg data error: %v\n", err)
|
||||||
|
// resp.ErrCode = 1
|
||||||
|
// resp.ErrMsg = err.Error()
|
||||||
|
// return resp, nil
|
||||||
|
// }
|
||||||
|
// defer rows.Close()
|
||||||
|
// for rows.Next() {
|
||||||
|
// tempResp := new(pbMsg.MsgFormat)
|
||||||
|
// rows.Scan(&tempResp.SendID, &tempResp.RecvID, &tempResp.MsgType, &tempResp.PushMsgType, &tempResp.ChatType,
|
||||||
|
// &tempResp.MsgID, &tempResp.Msg, &tempResp.Seq, &tempResp.Time, &tempResp.SendNickName, &tempResp.RecvNickName,
|
||||||
|
// &tempResp.SendHeadUrl, &tempResp.RecvHeadUrl)
|
||||||
|
// respMsgFormat = append(respMsgFormat, tempResp)
|
||||||
|
// }
|
||||||
|
// respUserMsgFormat = msgHandleByUser(respMsgFormat, in.UserID)
|
||||||
|
// return &pbMsg.PullMessageResp{
|
||||||
|
// ErrCode: 0,
|
||||||
|
// ErrMsg: "",
|
||||||
|
// UserMsg: respUserMsgFormat,
|
||||||
|
// }, nil
|
||||||
|
//}
|
||||||
|
func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) {
|
||||||
|
log.InfoByKv("rpc pullMessage is arriving", in.OperationID, "args", in.String())
|
||||||
|
resp := new(pbMsg.PullMessageResp)
|
||||||
|
var respSingleMsgFormat []*pbMsg.GatherFormat
|
||||||
|
var respGroupMsgFormat []*pbMsg.GatherFormat
|
||||||
|
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetUserChat(in.UserID, in.SeqBegin, in.SeqEnd)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("pullMsg data error", in.OperationID, in.String())
|
||||||
|
resp.ErrCode = 1
|
||||||
|
resp.ErrMsg = err.Error()
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID)
|
||||||
|
respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat)
|
||||||
|
return &pbMsg.PullMessageResp{
|
||||||
|
ErrCode: 0,
|
||||||
|
ErrMsg: "",
|
||||||
|
MaxSeq: MaxSeq,
|
||||||
|
MinSeq: MinSeq,
|
||||||
|
SingleUserMsg: respSingleMsgFormat,
|
||||||
|
GroupUserMsg: respGroupMsgFormat,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat {
|
||||||
|
var userid string
|
||||||
|
var respMsgFormat []*pbMsg.GatherFormat
|
||||||
|
m := make(map[string]MsgFormats)
|
||||||
|
//将消息以用户为维度聚集
|
||||||
|
for _, v := range allMsg {
|
||||||
|
if v.RecvID != ownerId {
|
||||||
|
userid = v.RecvID
|
||||||
|
} else {
|
||||||
|
userid = v.SendID
|
||||||
|
}
|
||||||
|
if value, ok := m[userid]; !ok {
|
||||||
|
var t MsgFormats
|
||||||
|
m[userid] = append(t, v)
|
||||||
|
} else {
|
||||||
|
m[userid] = append(value, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//形成pb格式返回
|
||||||
|
for user, msg := range m {
|
||||||
|
tempUserMsg := new(pbMsg.GatherFormat)
|
||||||
|
tempUserMsg.ID = user
|
||||||
|
tempUserMsg.List = msg
|
||||||
|
sort.Sort(msg)
|
||||||
|
respMsgFormat = append(respMsgFormat, tempUserMsg)
|
||||||
|
}
|
||||||
|
return respMsgFormat
|
||||||
|
}
|
||||||
|
func groupMsgHandleByUser(allMsg []*pbMsg.MsgFormat) []*pbMsg.GatherFormat {
|
||||||
|
var respMsgFormat []*pbMsg.GatherFormat
|
||||||
|
m := make(map[string]MsgFormats)
|
||||||
|
//将消息以用户为维度聚集
|
||||||
|
for _, v := range allMsg {
|
||||||
|
//获得群ID
|
||||||
|
groupID := strings.Split(v.RecvID, " ")[1]
|
||||||
|
if value, ok := m[groupID]; !ok {
|
||||||
|
var t MsgFormats
|
||||||
|
m[groupID] = append(t, v)
|
||||||
|
} else {
|
||||||
|
m[groupID] = append(value, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
//形成pb格式返回
|
||||||
|
for groupID, msg := range m {
|
||||||
|
tempUserMsg := new(pbMsg.GatherFormat)
|
||||||
|
tempUserMsg.ID = groupID
|
||||||
|
tempUserMsg.List = msg
|
||||||
|
sort.Sort(msg)
|
||||||
|
respMsgFormat = append(respMsgFormat, tempUserMsg)
|
||||||
|
}
|
||||||
|
return respMsgFormat
|
||||||
|
}
|
||||||
|
|
||||||
|
type MsgFormats []*pbMsg.MsgFormat
|
||||||
|
|
||||||
|
// 实现sort.Interface接口取元素数量方法
|
||||||
|
func (s MsgFormats) Len() int {
|
||||||
|
return len(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 实现sort.Interface接口比较元素方法
|
||||||
|
func (s MsgFormats) Less(i, j int) bool {
|
||||||
|
return s[i].SendTime < s[j].SendTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// 实现sort.Interface接口交换元素方法
|
||||||
|
func (s MsgFormats) Swap(i, j int) {
|
||||||
|
s[i], s[j] = s[j], s[i]
|
||||||
|
}
|
65
src/rpc/chat/chat/rpcChat.go
Normal file
65
src/rpc/chat/chat/rpcChat.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package rpcChat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/config"
|
||||||
|
"Open_IM/src/common/kafka"
|
||||||
|
log2 "Open_IM/src/common/log"
|
||||||
|
pbChat "Open_IM/src/proto/chat"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rpcChat struct {
|
||||||
|
rpcPort int
|
||||||
|
rpcRegisterName string
|
||||||
|
etcdSchema string
|
||||||
|
etcdAddr []string
|
||||||
|
producer *kafka.Producer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRpcChatServer(port int) *rpcChat {
|
||||||
|
rc := rpcChat{
|
||||||
|
rpcPort: port,
|
||||||
|
rpcRegisterName: config.Config.RpcRegisterName.OpenImOfflineMessageName,
|
||||||
|
etcdSchema: config.Config.Etcd.EtcdSchema,
|
||||||
|
etcdAddr: config.Config.Etcd.EtcdAddr,
|
||||||
|
}
|
||||||
|
rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
|
||||||
|
return &rc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rpc *rpcChat) Run() {
|
||||||
|
log2.Info("", "", "rpc get_token init...")
|
||||||
|
|
||||||
|
address := utils.ServerIP + ":" + strconv.Itoa(rpc.rpcPort)
|
||||||
|
listener, err := net.Listen("tcp", address)
|
||||||
|
if err != nil {
|
||||||
|
log2.Error("", "", "listen network failed, err = %s, address = %s", err.Error(), address)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log2.Info("", "", "listen network success, address = %s", address)
|
||||||
|
|
||||||
|
//grpc server
|
||||||
|
srv := grpc.NewServer()
|
||||||
|
defer srv.GracefulStop()
|
||||||
|
|
||||||
|
//service registers with etcd
|
||||||
|
|
||||||
|
pbChat.RegisterChatServer(srv, rpc)
|
||||||
|
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName, 10)
|
||||||
|
if err != nil {
|
||||||
|
log2.Error("", "", "register rpc get_token to etcd failed, err = %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = srv.Serve(listener)
|
||||||
|
if err != nil {
|
||||||
|
log2.Info("", "", "rpc get_token fail, err = %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log2.Info("", "", "rpc get_token init success")
|
||||||
|
}
|
50
src/rpc/chat/chat/send_msg.go
Normal file
50
src/rpc/chat/chat/send_msg.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package rpcChat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/src/common/log"
|
||||||
|
pbChat "Open_IM/src/proto/chat"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"context"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
|
||||||
|
|
||||||
|
serverMsgID := GetMsgID(pb.SendID)
|
||||||
|
pbData := pbChat.WSToMsgSvrChatMsg{}
|
||||||
|
pbData.MsgFrom = pb.MsgFrom
|
||||||
|
pbData.SessionType = pb.SessionType
|
||||||
|
pbData.ContentType = pb.ContentType
|
||||||
|
pbData.Content = pb.Content
|
||||||
|
pbData.RecvID = pb.RecvID
|
||||||
|
pbData.ForceList = pb.ForceList
|
||||||
|
pbData.OfflineInfo = pb.OffLineInfo
|
||||||
|
pbData.Options = pb.Options
|
||||||
|
pbData.PlatformID = pb.PlatformID
|
||||||
|
pbData.SendID = pb.SendID
|
||||||
|
pbData.MsgID = serverMsgID
|
||||||
|
pbData.OperationID = pb.OperationID
|
||||||
|
pbData.Token = pb.Token
|
||||||
|
pbData.SendTime = utils.GetCurrentTimestampBySecond()
|
||||||
|
rpc.sendMsgToKafka(&pbData, pbData.RecvID)
|
||||||
|
rpc.sendMsgToKafka(&pbData, pbData.SendID)
|
||||||
|
replay := pbChat.UserSendMsgResp{}
|
||||||
|
replay.ReqIdentifier = pb.ReqIdentifier
|
||||||
|
replay.MsgIncr = pb.MsgIncr
|
||||||
|
replay.ClientMsgID = pb.ClientMsgID
|
||||||
|
replay.ServerMsgID = serverMsgID
|
||||||
|
|
||||||
|
return &replay, nil
|
||||||
|
}
|
||||||
|
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) {
|
||||||
|
pid, offset, err := rpc.producer.SendMessage(m, key)
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func GetMsgID(sendID string) string {
|
||||||
|
t := time.Now().Format("2006-01-02 15:04:05")
|
||||||
|
return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
|
||||||
|
}
|
14
src/rpc/chat/open_im_msg.go
Normal file
14
src/rpc/chat/open_im_msg.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
rpcChat "Open_IM/src/rpc/chat/chat"
|
||||||
|
"Open_IM/src/utils"
|
||||||
|
"flag"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
rpcPort := flag.String("port", "", "rpc listening port")
|
||||||
|
flag.Parse()
|
||||||
|
rpcServer := rpcChat.NewRpcChatServer(utils.StringToInt(*rpcPort))
|
||||||
|
rpcServer.Run()
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user