mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-05-29 09:19:18 +08:00
Merge remote-tracking branch 'origin/main'
This commit is contained in:
commit
b87900b070
@ -105,13 +105,13 @@ business data.
|
||||
- **MySQL**
|
||||
|
||||
```
|
||||
待补充
|
||||
...
|
||||
```
|
||||
|
||||
- **MongoDB**
|
||||
|
||||
```
|
||||
待补充
|
||||
...
|
||||
```
|
||||
|
||||
6. Enter the script directory and execute the script according to the steps。
|
||||
|
130
config/config.yaml
Normal file
130
config/config.yaml
Normal file
@ -0,0 +1,130 @@
|
||||
# The class cannot be named by Pascal or camel case.
|
||||
# If it is not used, the corresponding structure will not be set,
|
||||
# and it will not be read naturally.
|
||||
|
||||
#---------------Infrastructure configuration---------------------#
|
||||
etcd:
|
||||
etcdSchema: openIM
|
||||
etcdAddr: [ 47.112.160.66:2379 ]
|
||||
|
||||
mysql:
|
||||
dbAddress: [ 47.112.160.66:3306 ]
|
||||
dbUserName: root
|
||||
dbPassword: 123456
|
||||
dbDatabaseName: openIM
|
||||
dbTableName: eMsg
|
||||
dbMsgTableNum: 1
|
||||
dbMaxOpenConns: 20
|
||||
dbMaxIdleConns: 10
|
||||
dbMaxLifeTime: 120
|
||||
|
||||
mongo:
|
||||
dbAddress: [ 47.112.160.66:27017 ]
|
||||
dbDirect: false
|
||||
dbTimeout: 10
|
||||
dbDatabase: [ openIM ]
|
||||
dbSource: admin
|
||||
dbUserName:
|
||||
dbPassword:
|
||||
dbMaxPoolSize: 20
|
||||
dbRetainChatRecords: 7
|
||||
|
||||
redis:
|
||||
dbAddress: [47.112.160.66:6379]
|
||||
dbMaxIdle: 128
|
||||
dbMaxActive: 0
|
||||
dbIdleTimeout: 120
|
||||
dbPassWord: open_im4545453
|
||||
|
||||
kafka:
|
||||
ws2mschat:
|
||||
addr: [ 47.112.160.66:9092 ]
|
||||
topic: "ws2ms_chat"
|
||||
ms2pschat:
|
||||
addr: [ 47.112.160.66:9092 ]
|
||||
topic: "ms2ps_chat"
|
||||
consumergroupid:
|
||||
msgToMongo: mongo
|
||||
msgToMySql: mysql
|
||||
msgToPush: push
|
||||
|
||||
|
||||
|
||||
#---------------Internal service configuration---------------------#
|
||||
|
||||
# The service ip default is empty,
|
||||
# automatically obtain the machine's valid network card ip as the service ip,
|
||||
# otherwise the configuration ip is preferred
|
||||
serverip:
|
||||
|
||||
api:
|
||||
openImApiPort: [ 10000 ]
|
||||
|
||||
credential:
|
||||
tencent:
|
||||
appID: 1302656840
|
||||
region: ap-chengdu
|
||||
bucket: echat-1302656840
|
||||
secretID: AKIDGNYVChzIQinu7QEgtNp0hnNgqcV8vZTC
|
||||
secretKey: kz15vW83qM6dBUWIq681eBZA0c0vlIbe
|
||||
|
||||
|
||||
rpcport:
|
||||
openImUserPort: [ 10100,10101 ]
|
||||
openImFriendPort: [ 10200,10201 ]
|
||||
openImOfflineMessagePort: [ 10300 ]
|
||||
openImOnlineRelayPort: [ 10400 ]
|
||||
openImGroupPort: [ 10500 ,10501 ]
|
||||
openImAuthPort: [ 10600, 10601 ]
|
||||
openImPushPort: [ 10700 ]
|
||||
|
||||
|
||||
rpcregistername:
|
||||
openImUserName: User
|
||||
openImFriendName: Friend
|
||||
openImOfflineMessageName: OfflineMessage
|
||||
openImPushName: Push
|
||||
openImOnlineMessageRelayName: OnlineMessageRelay
|
||||
openImGroupName: Group
|
||||
rpcGetTokenName: Auth
|
||||
|
||||
log:
|
||||
storageLocation: ../logs/
|
||||
elasticSearchSwitch: false
|
||||
elasticSearchAddr: [ 47.112.160.66:9201 ]
|
||||
elasticSearchUser: ""
|
||||
elasticSearchPassword: ""
|
||||
|
||||
modulename:
|
||||
longConnSvrName: msg_gateway
|
||||
msgTransferName: msg_transfer
|
||||
pushName: push
|
||||
|
||||
longconnsvr:
|
||||
websocketPort: [ 7778 ]
|
||||
websocketMaxConnNum: 10000
|
||||
websocketMaxMsgLen: 4096
|
||||
websocketTimeOut: 10
|
||||
|
||||
push:
|
||||
tpns:
|
||||
ios:
|
||||
accessID: 1600018281
|
||||
secretKey: 3cd68a77a95b89e5089a1aca523f318f
|
||||
android:
|
||||
accessID: 111
|
||||
secretKey: 111
|
||||
|
||||
secret: tuoyun
|
||||
|
||||
multiloginpolicy:
|
||||
onlyOneTerminalAccess: false
|
||||
mobileAndPCTerminalAccessButOtherTerminalKickEachOther: true
|
||||
allTerminalAccess: false
|
||||
|
||||
#token config
|
||||
tokenpolicy:
|
||||
accessSecret: "open_im_server"
|
||||
# Token effective time seconds as a unit
|
||||
#Seven days 7*24*60*60
|
||||
accessExpire: 604800
|
46
src/api/friend/add_blacklist.go
Normal file
46
src/api/friend/add_blacklist.go
Normal file
@ -0,0 +1,46 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsAddBlackList struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
}
|
||||
|
||||
func AddBlacklist(c *gin.Context) {
|
||||
log.Info("", "", "api add blacklist init ....")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsSearchFriend{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.AddBlacklistReq{
|
||||
Uid: params.UID,
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api add blacklist is server:userID=%s", req.Uid)
|
||||
RpcResp, err := client.AddBlacklist(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call add blacklist rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call add blacklist rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call add blacklist rpc server success,args=%s", RpcResp.String())
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
log.InfoByArgs("api add blacklist success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
48
src/api/friend/add_friend.go
Normal file
48
src/api/friend/add_friend.go
Normal file
@ -0,0 +1,48 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsAddFriend struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
ReqMessage string `json:"reqMessage"`
|
||||
}
|
||||
|
||||
func AddFriend(c *gin.Context) {
|
||||
log.Info("", "", "api add friend init ....")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsAddFriend{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.AddFriendReq{
|
||||
Uid: params.UID,
|
||||
OperationID: params.OperationID,
|
||||
ReqMessage: params.ReqMessage,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api add friend is server")
|
||||
RpcResp, err := client.AddFriend(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call add friend rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call add friend rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call add friend rpc server success,args=%s", RpcResp.String())
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
log.InfoByArgs("api add friend success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
48
src/api/friend/add_friend_response.go
Normal file
48
src/api/friend/add_friend_response.go
Normal file
@ -0,0 +1,48 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsAddFriendResponse struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
Flag int32 `json:"flag" binding:"required"`
|
||||
}
|
||||
|
||||
func AddFriendResponse(c *gin.Context) {
|
||||
log.Info("", "", fmt.Sprintf("api add friend response init ...."))
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsAddFriendResponse{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.AddedFriendReq{
|
||||
Uid: params.UID,
|
||||
Flag: params.Flag,
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api add friend response is server:userID=%s", req.Uid)
|
||||
RpcResp, err := client.AddedFriend(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call add_friend_response rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call add_friend_response rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call add friend response rpc server success,args=%s", RpcResp.String())
|
||||
c.JSON(http.StatusOK, gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg})
|
||||
log.InfoByArgs("api add friend response success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
47
src/api/friend/delete_friend.go
Normal file
47
src/api/friend/delete_friend.go
Normal file
@ -0,0 +1,47 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsDeleteFriend struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
}
|
||||
|
||||
func DeleteFriend(c *gin.Context) {
|
||||
log.Info("", "", fmt.Sprintf("api delete_friend init ...."))
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsDeleteFriend{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.DeleteFriendReq{
|
||||
Uid: params.UID,
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api delete_friend is server:%s", req.Uid)
|
||||
RpcResp, err := client.DeleteFriend(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call delete_friend rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call delete_friend rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call delete_friend rpc server,args=%s", RpcResp.String())
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
log.InfoByArgs("api delete_friend success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
78
src/api/friend/get_blcaklist.go
Normal file
78
src/api/friend/get_blcaklist.go
Normal file
@ -0,0 +1,78 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsGetBlackList struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
}
|
||||
|
||||
type blackListUserInfo struct {
|
||||
UID string `json:"uid"`
|
||||
Name string `json:"name"`
|
||||
Icon string `json:"icon"`
|
||||
Gender int32 `json:"gender"`
|
||||
Mobile string `json:"mobile"`
|
||||
Birth string `json:"birth"`
|
||||
Email string `json:"email"`
|
||||
Ex string `json:"ex"`
|
||||
}
|
||||
|
||||
func GetBlacklist(c *gin.Context) {
|
||||
log.Info("", "", "api get blacklist init ....")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsGetBlackList{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.GetBlacklistReq{
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, fmt.Sprintf("api get blacklist is server"))
|
||||
RpcResp, err := client.GetBlacklist(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call get_friend_list rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call get blacklist rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call get blacklist rpc server success,args=%s", RpcResp.String())
|
||||
if RpcResp.ErrorCode == 0 {
|
||||
userBlackList := make([]blackListUserInfo, 0)
|
||||
for _, friend := range RpcResp.Data {
|
||||
var fi blackListUserInfo
|
||||
fi.UID = friend.Uid
|
||||
fi.Name = friend.Name
|
||||
fi.Icon = friend.Icon
|
||||
fi.Gender = friend.Gender
|
||||
fi.Mobile = friend.Mobile
|
||||
fi.Birth = friend.Birth
|
||||
fi.Email = friend.Email
|
||||
fi.Ex = friend.Ex
|
||||
userBlackList = append(userBlackList, fi)
|
||||
}
|
||||
resp := gin.H{
|
||||
"errCode": RpcResp.ErrorCode,
|
||||
"errMsg": RpcResp.ErrorMsg,
|
||||
"data": userBlackList,
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
} else {
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
log.InfoByArgs("api get black list success return,get args=%s,return=%s", req.String(), RpcResp.String())
|
||||
}
|
78
src/api/friend/get_friend_apply_list.go
Normal file
78
src/api/friend/get_friend_apply_list.go
Normal file
@ -0,0 +1,78 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsGetFriendApplyList struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
}
|
||||
type UserInfo struct {
|
||||
UID string `json:"uid"`
|
||||
Name string `json:"name"`
|
||||
Icon string `json:"icon"`
|
||||
Gender int32 `json:"gender"`
|
||||
Mobile string `json:"mobile"`
|
||||
Birth string `json:"birth"`
|
||||
Email string `json:"email"`
|
||||
Ex string `json:"ex"`
|
||||
ReqMessage string `json:"reqMessage"`
|
||||
ApplyTime string `json:"applyTime"`
|
||||
Flag int32 `json:"flag"`
|
||||
}
|
||||
|
||||
func GetFriendApplyList(c *gin.Context) {
|
||||
log.Info("", "", "api get_friend_apply_list init ....")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsGetFriendApplyList{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.GetFriendApplyReq{
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api get friend apply list is server")
|
||||
RpcResp, err := client.GetFriendApplyList(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call get friend apply list rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call get friend apply list rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call get friend apply list rpc server success,args=%s", RpcResp.String())
|
||||
if RpcResp.ErrorCode == 0 {
|
||||
userInfoList := make([]UserInfo, 0)
|
||||
for _, applyUserinfo := range RpcResp.Data {
|
||||
var un UserInfo
|
||||
un.UID = applyUserinfo.Uid
|
||||
un.Name = applyUserinfo.Name
|
||||
un.Icon = applyUserinfo.Icon
|
||||
un.Gender = applyUserinfo.Gender
|
||||
un.Mobile = applyUserinfo.Mobile
|
||||
un.Birth = applyUserinfo.Birth
|
||||
un.Email = applyUserinfo.Email
|
||||
un.Ex = applyUserinfo.Ex
|
||||
un.Flag = applyUserinfo.Flag
|
||||
un.ApplyTime = applyUserinfo.ApplyTime
|
||||
un.ReqMessage = applyUserinfo.ReqMessage
|
||||
userInfoList = append(userInfoList, un)
|
||||
}
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg, "data": userInfoList}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
} else {
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
log.InfoByArgs("api get friend apply list success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
82
src/api/friend/get_friend_list.go
Normal file
82
src/api/friend/get_friend_list.go
Normal file
@ -0,0 +1,82 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsGetFriendLIst struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
}
|
||||
|
||||
type friendInfo struct {
|
||||
UID string `json:"uid"`
|
||||
Name string `json:"name"`
|
||||
Icon string `json:"icon"`
|
||||
Gender int32 `json:"gender"`
|
||||
Mobile string `json:"mobile"`
|
||||
Birth string `json:"birth"`
|
||||
Email string `json:"email"`
|
||||
Ex string `json:"ex"`
|
||||
Comment string `json:"comment"`
|
||||
IsInBlackList int32 `json:"isInBlackList"`
|
||||
}
|
||||
|
||||
func GetFriendList(c *gin.Context) {
|
||||
log.Info("", "", fmt.Sprintf("api get_friendlist init ...."))
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsGetFriendLIst{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.GetFriendListReq{
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api get friend list is server")
|
||||
RpcResp, err := client.GetFriendList(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call get friend list rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call get friend list rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call get friend list rpc server success,args=%s", RpcResp.String())
|
||||
if RpcResp.ErrorCode == 0 {
|
||||
friendsInfo := make([]friendInfo, 0)
|
||||
for _, friend := range RpcResp.Data {
|
||||
var fi friendInfo
|
||||
fi.UID = friend.Uid
|
||||
fi.Name = friend.Name
|
||||
fi.Icon = friend.Icon
|
||||
fi.Gender = friend.Gender
|
||||
fi.Mobile = friend.Mobile
|
||||
fi.Birth = friend.Birth
|
||||
fi.Email = friend.Email
|
||||
fi.Ex = friend.Ex
|
||||
fi.Comment = friend.Comment
|
||||
fi.IsInBlackList = friend.IsInBlackList
|
||||
friendsInfo = append(friendsInfo, fi)
|
||||
}
|
||||
resp := gin.H{
|
||||
"errCode": RpcResp.ErrorCode,
|
||||
"errMsg": RpcResp.ErrorMsg,
|
||||
"data": friendsInfo,
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
} else {
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
log.InfoByArgs("api get friend list success return,get args=%s,return=%s", req.String(), RpcResp.String())
|
||||
}
|
46
src/api/friend/remove_blacklist.go
Normal file
46
src/api/friend/remove_blacklist.go
Normal file
@ -0,0 +1,46 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsRemoveBlackList struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
}
|
||||
|
||||
func RemoveBlacklist(c *gin.Context) {
|
||||
log.Info("", "", "api remove_blacklist init ....")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsRemoveBlackList{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.RemoveBlacklistReq{
|
||||
Uid: params.UID,
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api remove blacklist is server:userID=%s", req.Uid)
|
||||
RpcResp, err := client.RemoveBlacklist(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call remove blacklist rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call remove blacklist rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call remove blacklist rpc server success,args=%s", RpcResp.String())
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
log.InfoByArgs("api remove blacklist success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
70
src/api/friend/search_friend.go
Normal file
70
src/api/friend/search_friend.go
Normal file
@ -0,0 +1,70 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsSearchFriend struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
}
|
||||
|
||||
func SearchFriend(c *gin.Context) {
|
||||
log.Info("", "", fmt.Sprintf("api search friend init ...."))
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsSearchFriend{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.SearchFriendReq{
|
||||
Uid: params.UID,
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api search_friend is server")
|
||||
RpcResp, err := client.SearchFriend(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call search friend rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call search friend rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByArgs("call search friend rpc server success,args=%s", RpcResp.String())
|
||||
if RpcResp.ErrorCode == 0 {
|
||||
resp := gin.H{
|
||||
"errCode": RpcResp.ErrorCode,
|
||||
"errMsg": RpcResp.ErrorMsg,
|
||||
"data": gin.H{
|
||||
"uid": RpcResp.Data.Uid,
|
||||
"icon": RpcResp.Data.Icon,
|
||||
"name": RpcResp.Data.Name,
|
||||
"gender": RpcResp.Data.Gender,
|
||||
"mobile": RpcResp.Data.Mobile,
|
||||
"birth": RpcResp.Data.Birth,
|
||||
"email": RpcResp.Data.Email,
|
||||
"ex": RpcResp.Data.Ex,
|
||||
"comment": RpcResp.Data.Comment,
|
||||
"isFriend": RpcResp.Data.IsFriend,
|
||||
"isInBlackList": RpcResp.Data.IsInBlackList,
|
||||
},
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
} else {
|
||||
resp := gin.H{
|
||||
"errCode": RpcResp.ErrorCode,
|
||||
"errMsg": RpcResp.ErrorMsg,
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
log.InfoByArgs("api search_friend success return,get args=%s,return=%s", req.String(), RpcResp.String())
|
||||
}
|
47
src/api/friend/set_friend_comment.go
Normal file
47
src/api/friend/set_friend_comment.go
Normal file
@ -0,0 +1,47 @@
|
||||
package friend
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbFriend "Open_IM/src/proto/friend"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsSetFriendComment struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UID string `json:"uid" binding:"required"`
|
||||
Comment string `json:"comment"`
|
||||
}
|
||||
|
||||
func SetFriendComment(c *gin.Context) {
|
||||
log.Info("", "", "api set friend comment init ....")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName)
|
||||
client := pbFriend.NewFriendClient(etcdConn)
|
||||
|
||||
params := paramsSetFriendComment{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbFriend.SetFriendCommentReq{
|
||||
Uid: params.UID,
|
||||
OperationID: params.OperationID,
|
||||
Comment: params.Comment,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.Info(req.Token, req.OperationID, "api set friend comment is server")
|
||||
RpcResp, err := client.SetFriendComment(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call set friend comment rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call set friend comment rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.Info("", "", "call set friend comment rpc server success,args=%s", RpcResp.String())
|
||||
c.JSON(http.StatusOK, gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg})
|
||||
log.Info("", "", "api set friend comment success return,get args=%s,return args=%s", req.String(), RpcResp.String())
|
||||
}
|
73
src/api/user/get_user_info.go
Normal file
73
src/api/user/get_user_info.go
Normal file
@ -0,0 +1,73 @@
|
||||
package user
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbUser "Open_IM/src/proto/user"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type userInfo struct {
|
||||
UID string `json:"uid"`
|
||||
Name string `json:"name"`
|
||||
Icon string `json:"icon"`
|
||||
Gender int32 `json:"gender"`
|
||||
Mobile string `json:"mobile"`
|
||||
Birth string `json:"birth"`
|
||||
Email string `json:"email"`
|
||||
Ex string `json:"ex"`
|
||||
}
|
||||
|
||||
func GetUserInfo(c *gin.Context) {
|
||||
log.InfoByKv("api get userinfo init...", "")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
|
||||
client := pbUser.NewUserClient(etcdConn)
|
||||
|
||||
params := paramsStruct{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbUser.GetUserInfoReq{
|
||||
UserIDList: params.UIDList,
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
}
|
||||
log.InfoByKv("api get user info is server", c.PostForm("OperationID"), c.Request.Header.Get("token"))
|
||||
RpcResp, err := client.GetUserInfo(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call get user info rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"errorCode": 500,
|
||||
"errorMsg": "call rpc server failed",
|
||||
})
|
||||
return
|
||||
}
|
||||
log.InfoByKv("call get user info rpc server success", params.OperationID)
|
||||
if RpcResp.ErrorCode == 0 {
|
||||
userInfoList := make([]userInfo, 0)
|
||||
for _, user := range RpcResp.Data {
|
||||
var ui userInfo
|
||||
ui.UID = user.Uid
|
||||
ui.Name = user.Name
|
||||
ui.Icon = user.Icon
|
||||
ui.Gender = user.Gender
|
||||
ui.Mobile = user.Mobile
|
||||
ui.Birth = user.Birth
|
||||
ui.Email = user.Email
|
||||
ui.Ex = user.Ex
|
||||
userInfoList = append(userInfoList, ui)
|
||||
}
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg, "data": userInfoList}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
} else {
|
||||
resp := gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
log.InfoByKv("api get user info return success", params.OperationID, "args=%s", RpcResp.String())
|
||||
}
|
59
src/api/user/update_user_info.go
Normal file
59
src/api/user/update_user_info.go
Normal file
@ -0,0 +1,59 @@
|
||||
package user
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
pbUser "Open_IM/src/proto/user"
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type paramsStruct struct {
|
||||
OperationID string `json:"operationID" binding:"required"`
|
||||
UIDList []string `json:"uidList"`
|
||||
Platform int32 `json:"platform"`
|
||||
Name string `json:"name"`
|
||||
Icon string `json:"icon"`
|
||||
Gender int32 `json:"gender"`
|
||||
Mobile string `json:"mobile"`
|
||||
Birth string `json:"birth"`
|
||||
Email string `json:"email"`
|
||||
Ex string `json:"ex"`
|
||||
}
|
||||
|
||||
func UpdateUserInfo(c *gin.Context) {
|
||||
log.InfoByKv("api update userinfo init...", "")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
|
||||
client := pbUser.NewUserClient(etcdConn)
|
||||
|
||||
params := paramsStruct{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
req := &pbUser.UpdateUserInfoReq{
|
||||
OperationID: params.OperationID,
|
||||
Token: c.Request.Header.Get("token"),
|
||||
Name: params.Name,
|
||||
Icon: params.Icon,
|
||||
Gender: params.Gender,
|
||||
Mobile: params.Mobile,
|
||||
Birth: params.Birth,
|
||||
Email: params.Email,
|
||||
Ex: params.Ex,
|
||||
}
|
||||
log.InfoByKv("api update user info is server", req.OperationID, req.Token)
|
||||
RpcResp, err := client.UpdateUserInfo(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error(req.Token, req.OperationID, "err=%s,call get user info rpc server failed", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.InfoByKv("call update user info rpc server success", params.OperationID)
|
||||
c.JSON(http.StatusOK, gin.H{"errCode": RpcResp.ErrorCode, "errMsg": RpcResp.ErrorMsg})
|
||||
log.InfoByKv("api update user info return success", params.OperationID, "args=%s", RpcResp.String())
|
||||
}
|
26
src/msg_gateway/Makefile
Normal file
26
src/msg_gateway/Makefile
Normal file
@ -0,0 +1,26 @@
|
||||
.PHONY: all build run gotool install clean help
|
||||
|
||||
BINARY_NAME=open_im_msg_gateway
|
||||
BIN_DIR=../../bin/
|
||||
LAN_FILE=.go
|
||||
GO_FILE:=${BINARY_NAME}${LAN_FILE}
|
||||
|
||||
all: gotool build
|
||||
|
||||
build:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE}
|
||||
|
||||
run:
|
||||
@go run ./
|
||||
|
||||
gotool:
|
||||
go fmt ./
|
||||
go vet ./
|
||||
|
||||
install:
|
||||
make build
|
||||
mv ${BINARY_NAME} ${BIN_DIR}
|
||||
|
||||
clean:
|
||||
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
|
||||
|
29
src/msg_gateway/gate/init.go
Normal file
29
src/msg_gateway/gate/init.go
Normal file
@ -0,0 +1,29 @@
|
||||
package gate
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
"github.com/go-playground/validator/v10"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
rwLock *sync.RWMutex
|
||||
validate *validator.Validate
|
||||
ws WServer
|
||||
rpcSvr RPCServer
|
||||
)
|
||||
|
||||
func Init(rpcPort, wsPort int) {
|
||||
//log initialization
|
||||
log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName)
|
||||
rwLock = new(sync.RWMutex)
|
||||
validate = validator.New()
|
||||
ws.onInit(wsPort)
|
||||
rpcSvr.onInit(rpcPort)
|
||||
}
|
||||
|
||||
func Run() {
|
||||
go ws.run()
|
||||
go rpcSvr.run()
|
||||
}
|
197
src/msg_gateway/gate/logic.go
Normal file
197
src/msg_gateway/gate/logic.go
Normal file
@ -0,0 +1,197 @@
|
||||
package gate
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/constant"
|
||||
"Open_IM/src/common/log"
|
||||
pbChat "Open_IM/src/proto/chat"
|
||||
"Open_IM/src/utils"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) {
|
||||
//ws online debug data
|
||||
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
|
||||
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
|
||||
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
|
||||
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
|
||||
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
|
||||
m := Req{}
|
||||
if err := json.Unmarshal(jsonMsg, &m); err != nil {
|
||||
log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error())
|
||||
ws.sendErrMsg(conn, 200, err.Error())
|
||||
return
|
||||
}
|
||||
if err := validate.Struct(m); err != nil {
|
||||
log.ErrorByKv("ws args validate err", "", "err", err.Error())
|
||||
ws.sendErrMsg(conn, 201, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if !utils.VerifyToken(m.Token, m.SendID) {
|
||||
ws.sendErrMsg(conn, 202, "token validate err")
|
||||
return
|
||||
}
|
||||
log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
|
||||
|
||||
switch m.ReqIdentifier {
|
||||
case constant.WSGetNewestSeq:
|
||||
ws.newestSeqReq(conn, &m)
|
||||
case constant.WSPullMsg:
|
||||
ws.pullMsgReq(conn, &m)
|
||||
case constant.WSSendMsg:
|
||||
ws.sendMsgReq(conn, &m)
|
||||
default:
|
||||
}
|
||||
}
|
||||
func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNewSeqResp) {
|
||||
mReply := make(map[string]interface{})
|
||||
mData := make(map[string]interface{})
|
||||
mReply["reqIdentifier"] = m.ReqIdentifier
|
||||
mReply["msgIncr"] = m.MsgIncr
|
||||
mReply["errCode"] = pb.GetErrCode()
|
||||
mReply["errMsg"] = pb.GetErrMsg()
|
||||
mData["seq"] = pb.GetSeq()
|
||||
mReply["data"] = mData
|
||||
ws.sendMsg(conn, mReply)
|
||||
}
|
||||
func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) {
|
||||
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
|
||||
pbData := pbChat.GetNewSeqReq{}
|
||||
pbData.UserID = m.SendID
|
||||
pbData.OperationID = m.OperationID
|
||||
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||
if grpcConn == nil {
|
||||
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m)
|
||||
}
|
||||
msgClient := pbChat.NewChatClient(grpcConn)
|
||||
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
|
||||
if err != nil {
|
||||
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
|
||||
return
|
||||
}
|
||||
log.InfoByKv("rpc call success to getNewSeq", pbData.OperationID, "replyData", reply.String())
|
||||
ws.newestSeqResp(conn, m, reply)
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMessageResp) {
|
||||
mReply := make(map[string]interface{})
|
||||
msg := make(map[string]interface{})
|
||||
mReply["reqIdentifier"] = m.ReqIdentifier
|
||||
mReply["msgIncr"] = m.MsgIncr
|
||||
mReply["errCode"] = pb.GetErrCode()
|
||||
mReply["errMsg"] = pb.GetErrMsg()
|
||||
//空切片
|
||||
if v := pb.GetSingleUserMsg(); v != nil {
|
||||
msg["single"] = v
|
||||
} else {
|
||||
msg["single"] = []pbChat.GatherFormat{}
|
||||
}
|
||||
if v := pb.GetGroupUserMsg(); v != nil {
|
||||
msg["group"] = v
|
||||
} else {
|
||||
msg["group"] = []pbChat.GatherFormat{}
|
||||
}
|
||||
msg["maxSeq"] = pb.GetMaxSeq()
|
||||
msg["minSeq"] = pb.GetMinSeq()
|
||||
mReply["data"] = msg
|
||||
ws.sendMsg(conn, mReply)
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) {
|
||||
log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m)
|
||||
reply := new(pbChat.PullMessageResp)
|
||||
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg)
|
||||
if isPass {
|
||||
pbData := pbChat.PullMessageReq{}
|
||||
pbData.UserID = m.SendID
|
||||
pbData.OperationID = m.OperationID
|
||||
pbData.SeqBegin = data.(SeqData).SeqBegin
|
||||
pbData.SeqEnd = data.(SeqData).SeqEnd
|
||||
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||
msgClient := pbChat.NewChatClient(grpcConn)
|
||||
reply, err := msgClient.PullMessage(context.Background(), &pbData)
|
||||
if err != nil {
|
||||
log.ErrorByKv("PullMessage error", pbData.OperationID, "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
|
||||
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
|
||||
ws.pullMsgResp(conn, m, reply)
|
||||
} else {
|
||||
reply.ErrCode = errCode
|
||||
reply.ErrMsg = errMsg
|
||||
ws.pullMsgResp(conn, m, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WServer) sendMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.UserSendMsgResp) {
|
||||
mReply := make(map[string]interface{})
|
||||
mReplyData := make(map[string]interface{})
|
||||
mReply["reqIdentifier"] = m.ReqIdentifier
|
||||
mReply["msgIncr"] = m.MsgIncr
|
||||
mReply["errCode"] = pb.GetErrCode()
|
||||
mReply["errMsg"] = pb.GetErrMsg()
|
||||
mReplyData["clientMsgID"] = pb.GetClientMsgID()
|
||||
mReplyData["serverMsgID"] = pb.GetServerMsgID()
|
||||
mReply["data"] = mReplyData
|
||||
ws.sendMsg(conn, mReply)
|
||||
}
|
||||
|
||||
func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) {
|
||||
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
|
||||
reply := new(pbChat.UserSendMsgResp)
|
||||
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
|
||||
if isPass {
|
||||
data := pData.(MsgData)
|
||||
pbData := pbChat.UserSendMsgReq{
|
||||
ReqIdentifier: m.ReqIdentifier,
|
||||
Token: m.Token,
|
||||
SendID: m.SendID,
|
||||
OperationID: m.OperationID,
|
||||
MsgIncr: m.MsgIncr,
|
||||
PlatformID: data.PlatformID,
|
||||
SessionType: data.SessionType,
|
||||
MsgFrom: data.MsgFrom,
|
||||
ContentType: data.ContentType,
|
||||
RecvID: data.RecvID,
|
||||
ForceList: data.ForceList,
|
||||
Content: data.Content,
|
||||
Options: utils.MapToJsonString(data.Options),
|
||||
ClientMsgID: data.ClientMsgID,
|
||||
OffLineInfo: utils.MapToJsonString(data.OfflineInfo),
|
||||
}
|
||||
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||
client := pbChat.NewChatClient(etcdConn)
|
||||
log.Info("", "", "api UserSendMsg call, api call rpc...")
|
||||
reply, _ := client.UserSendMsg(context.Background(), &pbData)
|
||||
log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String())
|
||||
ws.sendMsgResp(conn, m, reply)
|
||||
} else {
|
||||
reply.ErrCode = errCode
|
||||
reply.ErrMsg = errMsg
|
||||
ws.sendMsgResp(conn, m, reply)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) sendMsg(conn *websocket.Conn, mReply map[string]interface{}) {
|
||||
bMsg, _ := json.Marshal(mReply)
|
||||
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
|
||||
if err != nil {
|
||||
log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply)
|
||||
}
|
||||
}
|
||||
func (ws *WServer) sendErrMsg(conn *websocket.Conn, errCode int32, errMsg string) {
|
||||
mReply := make(map[string]interface{})
|
||||
mReply["errCode"] = errCode
|
||||
mReply["errMsg"] = errMsg
|
||||
ws.sendMsg(conn, mReply)
|
||||
}
|
155
src/msg_gateway/gate/rpc_server.go
Normal file
155
src/msg_gateway/gate/rpc_server.go
Normal file
@ -0,0 +1,155 @@
|
||||
package gate
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/constant"
|
||||
"Open_IM/src/common/log"
|
||||
pbRelay "Open_IM/src/proto/relay"
|
||||
"Open_IM/src/utils"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"google.golang.org/grpc"
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type RPCServer struct {
|
||||
rpcPort int
|
||||
rpcRegisterName string
|
||||
etcdSchema string
|
||||
etcdAddr []string
|
||||
}
|
||||
|
||||
func (r *RPCServer) onInit(rpcPort int) {
|
||||
r.rpcPort = rpcPort
|
||||
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName
|
||||
r.etcdSchema = config.Config.Etcd.EtcdSchema
|
||||
r.etcdAddr = config.Config.Etcd.EtcdAddr
|
||||
}
|
||||
func (r *RPCServer) run() {
|
||||
ip := utils.ServerIP
|
||||
registerAddress := ip + ":" + utils.IntToString(r.rpcPort)
|
||||
listener, err := net.Listen("tcp", registerAddress)
|
||||
if err != nil {
|
||||
log.ErrorByArgs(fmt.Sprintf("fail to listening consumer, err:%v\n", err))
|
||||
return
|
||||
}
|
||||
defer listener.Close()
|
||||
srv := grpc.NewServer()
|
||||
defer srv.GracefulStop()
|
||||
pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r)
|
||||
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10)
|
||||
if err != nil {
|
||||
log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error())
|
||||
}
|
||||
err = srv.Serve(listener)
|
||||
if err != nil {
|
||||
log.ErrorByKv("push message rpc listening err", "", "err", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbRelay.MsgToUserResp, error) {
|
||||
log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String())
|
||||
var resp []*pbRelay.SingleMsgToUser
|
||||
var RecvID string
|
||||
msg := make(map[string]interface{})
|
||||
mReply := make(map[string]interface{})
|
||||
mReply["reqIdentifier"] = constant.WSPushMsg
|
||||
mReply["errCode"] = 0
|
||||
mReply["errMsg"] = ""
|
||||
msg["sendID"] = in.SendID
|
||||
msg["recvID"] = in.RecvID
|
||||
msg["msgFrom"] = in.MsgFrom
|
||||
msg["contentType"] = in.ContentType
|
||||
msg["sessionType"] = in.SessionType
|
||||
msg["serverMsgID"] = in.ServerMsgID
|
||||
msg["content"] = in.Content
|
||||
msg["seq"] = in.RecvSeq
|
||||
msg["sendTime"] = in.SendTime
|
||||
msg["isEmphasize"] = in.IsEmphasize
|
||||
msg["senderPlatformID"] = in.PlatformID
|
||||
mReply["data"] = msg
|
||||
bMsg, _ := json.Marshal(mReply)
|
||||
switch in.GetContentType() {
|
||||
case constant.SyncSenderMsg:
|
||||
log.InfoByKv("come sync", in.OperationID, "args", in.String())
|
||||
RecvID = in.GetSendID()
|
||||
for key, conn := range ws.wsUserToConn {
|
||||
UIDAndPID := strings.Split(key, " ")
|
||||
if UIDAndPID[0] == RecvID && utils.PlatformIDToName(in.GetPlatformID()) != UIDAndPID[1] {
|
||||
resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
|
||||
temp := &pbRelay.SingleMsgToUser{
|
||||
ResultCode: resultCode,
|
||||
RecvID: UIDAndPID[0],
|
||||
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
||||
}
|
||||
resp = append(resp, temp)
|
||||
}
|
||||
|
||||
}
|
||||
default:
|
||||
log.InfoByKv("not come sync", in.OperationID, "args", in.String())
|
||||
switch in.SessionType {
|
||||
case constant.SingleChatType:
|
||||
log.InfoByKv("come single", in.OperationID, "args", in.String())
|
||||
RecvID = in.GetRecvID()
|
||||
case constant.GroupChatType:
|
||||
RecvID = strings.Split(in.GetRecvID(), " ")[0]
|
||||
default:
|
||||
}
|
||||
log.InfoByKv("come for range", in.OperationID, "args", in.String())
|
||||
|
||||
for key, conn := range ws.wsUserToConn {
|
||||
UIDAndPID := strings.Split(key, " ")
|
||||
if UIDAndPID[0] == RecvID {
|
||||
resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
|
||||
temp := &pbRelay.SingleMsgToUser{
|
||||
ResultCode: resultCode,
|
||||
RecvID: UIDAndPID[0],
|
||||
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
||||
}
|
||||
resp = append(resp, temp)
|
||||
}
|
||||
}
|
||||
}
|
||||
return &pbRelay.MsgToUserResp{
|
||||
Resp: resp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
//func (r *RPCServer) SendMsgByWS(_ context.Context, in *pbRelay.SendMsgByWSReq) (*pbRelay.MsgToUserResp, error) {
|
||||
// log.InfoByKv("SendMsgByWS is arriving ", in.OperationID, "args", in.String())
|
||||
// resp := new(pbRelay.MsgToUserResp)
|
||||
// MsgId := ws.genMsgNum()
|
||||
// pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||
// pbData.SendID = in.SendID
|
||||
// pbData.RecvID = in.RecvID
|
||||
// pbData.MsgID = MsgId
|
||||
// pbData.SessionType = in.SessionType
|
||||
// pbData.MsgFrom = in.MsgFrom
|
||||
// pbData.Content = in.Content
|
||||
// pbData.ContentType = in.ContentType
|
||||
// pbData.OperationID = in.OperationID
|
||||
// pbData.SendTime = in.SendTime
|
||||
// pbData.PlatformID = in.PlatformID
|
||||
// pKafka.writeMsg(&pbData)
|
||||
// return resp, nil
|
||||
//}
|
||||
|
||||
func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
|
||||
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
|
||||
if err != nil {
|
||||
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
|
||||
"error", err, "senderPlatform", utils.PlatformIDToName(in.PlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
|
||||
ResultCode = -2
|
||||
return ResultCode
|
||||
} else {
|
||||
log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String())
|
||||
ResultCode = 0
|
||||
return ResultCode
|
||||
}
|
||||
|
||||
}
|
60
src/msg_gateway/gate/validate.go
Normal file
60
src/msg_gateway/gate/validate.go
Normal file
@ -0,0 +1,60 @@
|
||||
/*
|
||||
** description("").
|
||||
** copyright('Open_IM,www.Open_IM.io').
|
||||
** author("fg,Gordon@tuoyun.net").
|
||||
** time(2021/5/21 15:29).
|
||||
*/
|
||||
package gate
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/constant"
|
||||
"Open_IM/src/common/log"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
type Req struct {
|
||||
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
|
||||
Token string `json:"token" validate:"required"`
|
||||
SendID string `json:"sendID" validate:"required"`
|
||||
OperationID string `json:"operationID" validate:"required"`
|
||||
MsgIncr int32 `json:"msgIncr" validate:"required"`
|
||||
Data map[string]interface{} `json:"data"`
|
||||
}
|
||||
type SeqData struct {
|
||||
SeqBegin int64 `mapstructure:"seqBegin" validate:"required"`
|
||||
SeqEnd int64 `mapstructure:"seqEnd" validate:"required"`
|
||||
}
|
||||
type MsgData struct {
|
||||
PlatformID int32 `mapstructure:"platformID" validate:"required"`
|
||||
SessionType int32 `mapstructure:"sessionType" validate:"required"`
|
||||
MsgFrom int32 `mapstructure:"msgFrom" validate:"required"`
|
||||
ContentType int32 `mapstructure:"contentType" validate:"required"`
|
||||
RecvID string `mapstructure:"recvID" validate:"required"`
|
||||
ForceList []string `mapstructure:"forceList" validate:"required"`
|
||||
Content string `mapstructure:"content" validate:"required"`
|
||||
Options map[string]interface{} `mapstructure:"options" validate:"required"`
|
||||
ClientMsgID string `mapstructure:"clientMsgID" validate:"required"`
|
||||
OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"`
|
||||
Ext map[string]interface{} `mapstructure:"ext"`
|
||||
}
|
||||
|
||||
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) {
|
||||
switch r {
|
||||
case constant.WSPullMsg:
|
||||
data = SeqData{}
|
||||
case constant.WSSendMsg:
|
||||
data = MsgData{}
|
||||
default:
|
||||
}
|
||||
if err := mapstructure.WeakDecode(m.Data, &data); err != nil {
|
||||
log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r)
|
||||
return false, 203, err.Error(), nil
|
||||
} else if err := validate.Struct(data); err != nil {
|
||||
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
|
||||
return false, 204, err.Error(), nil
|
||||
|
||||
} else {
|
||||
return true, 0, "", data
|
||||
}
|
||||
|
||||
}
|
137
src/msg_gateway/gate/ws_server.go
Normal file
137
src/msg_gateway/gate/ws_server.go
Normal file
@ -0,0 +1,137 @@
|
||||
package gate
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/log"
|
||||
"Open_IM/src/utils"
|
||||
"github.com/gorilla/websocket"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type WServer struct {
|
||||
wsAddr string
|
||||
wsMaxConnNum int
|
||||
wsUpGrader *websocket.Upgrader
|
||||
wsConnToUser map[*websocket.Conn]string
|
||||
wsUserToConn map[string]*websocket.Conn
|
||||
}
|
||||
|
||||
func (ws *WServer) onInit(wsPort int) {
|
||||
ip := utils.ServerIP
|
||||
ws.wsAddr = ip + ":" + utils.IntToString(wsPort)
|
||||
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
|
||||
ws.wsConnToUser = make(map[*websocket.Conn]string)
|
||||
ws.wsUserToConn = make(map[string]*websocket.Conn)
|
||||
ws.wsUpGrader = &websocket.Upgrader{
|
||||
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
|
||||
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WServer) run() {
|
||||
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
|
||||
err := http.ListenAndServe(ws.wsAddr, nil) //Start listening
|
||||
if err != nil {
|
||||
log.ErrorByKv("Ws listening err", "", "err", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if ws.headerCheck(w, r) {
|
||||
query := r.URL.Query()
|
||||
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
|
||||
if err != nil {
|
||||
log.ErrorByKv("upgrade http conn err", "", "err", err)
|
||||
return
|
||||
} else {
|
||||
//Connection mapping relationship,
|
||||
//userID+" "+platformID->conn
|
||||
SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
|
||||
ws.addUserConn(SendID, conn)
|
||||
go ws.readMsg(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WServer) readMsg(conn *websocket.Conn) {
|
||||
for {
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err)
|
||||
ws.delUserConn(conn)
|
||||
return
|
||||
}
|
||||
ws.msgParse(conn, msg)
|
||||
//ws.writeMsg(conn, 1, chat)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error {
|
||||
rwLock.Lock()
|
||||
defer rwLock.Unlock()
|
||||
return conn.WriteMessage(a, msg)
|
||||
|
||||
}
|
||||
func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) {
|
||||
rwLock.Lock()
|
||||
defer rwLock.Unlock()
|
||||
ws.wsConnToUser[conn] = uid
|
||||
ws.wsUserToConn[uid] = conn
|
||||
log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid)
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) delUserConn(conn *websocket.Conn) {
|
||||
rwLock.Lock()
|
||||
defer rwLock.Unlock()
|
||||
if uid, ok := ws.wsConnToUser[conn]; ok {
|
||||
if _, ok = ws.wsUserToConn[uid]; ok {
|
||||
delete(ws.wsUserToConn, uid)
|
||||
log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "uid", uid)
|
||||
}
|
||||
delete(ws.wsConnToUser, conn)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func (ws *WServer) getUserConn(uid string) *websocket.Conn {
|
||||
rwLock.RLock()
|
||||
defer rwLock.RUnlock()
|
||||
if conn, ok := ws.wsUserToConn[uid]; ok {
|
||||
return conn
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (ws *WServer) getUserUid(conn *websocket.Conn) string {
|
||||
rwLock.RLock()
|
||||
defer rwLock.RUnlock()
|
||||
|
||||
if uid, ok := ws.wsConnToUser[conn]; ok {
|
||||
return uid
|
||||
}
|
||||
return ""
|
||||
}
|
||||
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool {
|
||||
status := http.StatusUnauthorized
|
||||
query := r.URL.Query()
|
||||
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
|
||||
if !utils.VerifyToken(query["token"][0], query["sendID"][0]) {
|
||||
log.ErrorByKv("Token verify failed", "", "query", query)
|
||||
w.Header().Set("Sec-Websocket-Version", "13")
|
||||
http.Error(w, http.StatusText(status), status)
|
||||
return false
|
||||
} else {
|
||||
log.InfoByKv("Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0])
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
log.ErrorByKv("Args err", "", "query", query)
|
||||
w.Header().Set("Sec-Websocket-Version", "13")
|
||||
http.Error(w, http.StatusText(status), status)
|
||||
return false
|
||||
}
|
||||
|
||||
}
|
18
src/msg_gateway/open_im_msg_gateway.go
Normal file
18
src/msg_gateway/open_im_msg_gateway.go
Normal file
@ -0,0 +1,18 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"Open_IM/src/msg_gateway/gate"
|
||||
"flag"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
rpcPort := flag.Int("rpc_port", 10500, "rpc listening port")
|
||||
wsPort := flag.Int("ws_port", 10800, "rpc listening port")
|
||||
flag.Parse()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
gate.Init(*rpcPort, *wsPort)
|
||||
gate.Run()
|
||||
wg.Wait()
|
||||
}
|
26
src/msg_transfer/Makefile
Normal file
26
src/msg_transfer/Makefile
Normal file
@ -0,0 +1,26 @@
|
||||
.PHONY: all build run gotool install clean help
|
||||
|
||||
BINARY_NAME=open_im_msg_transfer
|
||||
BIN_DIR=../../bin/
|
||||
LAN_FILE=.go
|
||||
GO_FILE:=${BINARY_NAME}${LAN_FILE}
|
||||
|
||||
all: gotool build
|
||||
|
||||
build:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE}
|
||||
|
||||
run:
|
||||
@go run ./
|
||||
|
||||
gotool:
|
||||
go fmt ./
|
||||
go vet ./
|
||||
|
||||
install:
|
||||
make build
|
||||
mv ${BINARY_NAME} ${BIN_DIR}
|
||||
|
||||
clean:
|
||||
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
|
||||
|
20
src/msg_transfer/logic/db.go
Normal file
20
src/msg_transfer/logic/db.go
Normal file
@ -0,0 +1,20 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/db"
|
||||
"Open_IM/src/common/db/mysql_model/im_mysql_model"
|
||||
pbMsg "Open_IM/src/proto/chat"
|
||||
)
|
||||
|
||||
func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error {
|
||||
seq, err := db.DB.IncrUserSeq(uid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pbMsg.RecvSeq = seq
|
||||
return db.DB.SaveUserChat(uid, pbMsg)
|
||||
}
|
||||
|
||||
func getGroupList(groupID string) ([]string, error) {
|
||||
return im_mysql_model.SelectGroupList(groupID)
|
||||
}
|
178
src/msg_transfer/logic/history_msg_handler.go
Normal file
178
src/msg_transfer/logic/history_msg_handler.go
Normal file
@ -0,0 +1,178 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/constant"
|
||||
kfk "Open_IM/src/common/kafka"
|
||||
"Open_IM/src/common/log"
|
||||
pbMsg "Open_IM/src/proto/chat"
|
||||
pb "Open_IM/src/proto/group"
|
||||
pbPush "Open_IM/src/proto/push"
|
||||
"Open_IM/src/utils"
|
||||
"context"
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/skiffer-git/grpc-etcdv3/getcdv3"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type fcb func(msg []byte, msgKey string)
|
||||
|
||||
type HistoryConsumerHandler struct {
|
||||
msgHandle map[string]fcb
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
}
|
||||
|
||||
func (mc *HistoryConsumerHandler) Init() {
|
||||
mc.msgHandle = make(map[string]fcb)
|
||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
|
||||
|
||||
}
|
||||
|
||||
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
|
||||
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
|
||||
pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||
err := proto.Unmarshal(msg, &pbData)
|
||||
if err != nil {
|
||||
log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
pbSaveData := pbMsg.MsgSvrToPushSvrChatMsg{}
|
||||
pbSaveData.SendID = pbData.SendID
|
||||
pbSaveData.SendTime = pbData.SendTime
|
||||
pbSaveData.Content = pbData.Content
|
||||
pbSaveData.MsgFrom = pbData.MsgFrom
|
||||
pbSaveData.ContentType = pbData.ContentType
|
||||
pbSaveData.SessionType = pbData.SessionType
|
||||
pbSaveData.MsgID = pbData.MsgID
|
||||
pbSaveData.RecvID = pbData.RecvID
|
||||
pbSaveData.PlatformID = pbData.PlatformID
|
||||
Options := utils.JsonStringToMap(pbData.Options)
|
||||
//Control whether to store offline messages (mongo)
|
||||
isHistory := utils.GetSwitchFromOptions(Options, "history")
|
||||
//Control whether to store history messages (mysql)
|
||||
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
|
||||
//Control whether to push message to sender's other terminal
|
||||
isSenderSync := utils.GetSwitchFromOptions(Options, "senderSync")
|
||||
if pbData.SessionType == constant.SingleChatType {
|
||||
log.Info("", "", "msg_transfer chat type = SingleChatType", isHistory, isPersist, isSenderSync)
|
||||
if isHistory {
|
||||
if msgKey == pbSaveData.RecvID {
|
||||
err := saveUserChat(pbData.RecvID, &pbSaveData)
|
||||
if err != nil {
|
||||
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
|
||||
}
|
||||
pbSaveData.Options = pbData.Options
|
||||
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
||||
sendMessageToPush(&pbSaveData)
|
||||
} else if msgKey == pbSaveData.SendID {
|
||||
err := saveUserChat(pbData.SendID, &pbSaveData)
|
||||
if err != nil {
|
||||
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
|
||||
}
|
||||
if isSenderSync {
|
||||
pbSaveData.ContentType = constant.SyncSenderMsg
|
||||
log.WarnByKv("SyncSenderMsg come here", pbData.OperationID, pbSaveData.String())
|
||||
sendMessageToPush(&pbSaveData)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} else if pbData.SessionType == constant.GroupChatType {
|
||||
log.Info("", "", "msg_transfer chat type = GroupChatType")
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
|
||||
client := pb.NewGroupClient(etcdConn)
|
||||
req := &pb.GetGroupInfoReq{
|
||||
GroupID: pbSaveData.RecvID,
|
||||
Token: pbData.Token,
|
||||
OperationID: pbSaveData.OperationID,
|
||||
}
|
||||
log.Info("", "", "msg_transfer call group rpc, data = %s", req.String())
|
||||
reply, err := client.GetGroupInfo(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Error("", "", "msg_transfer client.GetGroupInfo fail, err = %s", err.Error())
|
||||
return
|
||||
}
|
||||
for _, v := range reply.GroupMemberList {
|
||||
//Store RecvID is userID+" "+groupID when chatType is Group
|
||||
pbSaveData.RecvID = v.UserID + " " + pbSaveData.RecvID
|
||||
if isHistory {
|
||||
saveUserChat(v.UserID, &pbSaveData)
|
||||
}
|
||||
pbSaveData.Options = pbData.Options
|
||||
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
||||
if v.UserID != pbSaveData.SendID {
|
||||
if utils.IsContain(v.UserID, pbData.ForceList) {
|
||||
pbSaveData.IsEmphasize = true
|
||||
}
|
||||
sendMessageToPush(&pbSaveData)
|
||||
} else {
|
||||
if isSenderSync {
|
||||
pbSaveData.ContentType = constant.SyncSenderMsg
|
||||
sendMessageToPush(&pbSaveData)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Error("", "", "msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType)
|
||||
}
|
||||
|
||||
log.InfoByKv("msg_transfer handle topic success...", "", "")
|
||||
}
|
||||
|
||||
func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.InfoByKv("kafka get info to mongo", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value))
|
||||
mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) {
|
||||
msg := pbPush.PushMsgReq{}
|
||||
msg.OperationID = message.OperationID
|
||||
msg.PlatformID = message.PlatformID
|
||||
msg.Content = message.Content
|
||||
msg.ContentType = message.ContentType
|
||||
msg.SessionType = message.SessionType
|
||||
msg.RecvID = message.RecvID
|
||||
msg.SendID = message.SendID
|
||||
msg.IsEmphasize = message.IsEmphasize
|
||||
msg.MsgFrom = message.MsgFrom
|
||||
msg.Options = message.Options
|
||||
msg.RecvSeq = message.RecvSeq
|
||||
msg.SendTime = message.SendTime
|
||||
msg.MsgID = message.MsgID
|
||||
msg.OfflineInfo = message.OfflineInfo
|
||||
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
|
||||
if grpcConn == nil {
|
||||
log.ErrorByKv("rpc dial failed", msg.OperationID, "push data", msg.String())
|
||||
pid, offset, err := producer.SendMessage(message)
|
||||
if err != nil {
|
||||
log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
|
||||
_, err := msgClient.PushMsg(context.Background(), &msg)
|
||||
defer grpcConn.Close()
|
||||
if err != nil {
|
||||
log.ErrorByKv("rpc send failed", msg.OperationID, "push data", msg.String(), "err", err.Error())
|
||||
pid, offset, err := producer.SendMessage(message)
|
||||
if err != nil {
|
||||
log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
}
|
||||
} else {
|
||||
log.InfoByKv("rpc send success", msg.OperationID, "push data", msg.String())
|
||||
|
||||
}
|
||||
}
|
25
src/msg_transfer/logic/init.go
Normal file
25
src/msg_transfer/logic/init.go
Normal file
@ -0,0 +1,25 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/kafka"
|
||||
"Open_IM/src/common/log"
|
||||
)
|
||||
|
||||
var (
|
||||
persistentCH PersistentConsumerHandler
|
||||
historyCH HistoryConsumerHandler
|
||||
producer *kafka.Producer
|
||||
)
|
||||
|
||||
func Init() {
|
||||
log.NewPrivateLog(config.Config.ModuleName.MsgTransferName)
|
||||
persistentCH.Init()
|
||||
historyCH.Init()
|
||||
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
|
||||
}
|
||||
func Run() {
|
||||
//register mysqlConsumerHandler to
|
||||
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
||||
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
||||
}
|
63
src/msg_transfer/logic/persistent_msg_handler.go
Normal file
63
src/msg_transfer/logic/persistent_msg_handler.go
Normal file
@ -0,0 +1,63 @@
|
||||
/*
|
||||
** description("").
|
||||
** copyright('tuoyun,www.tuoyun.net').
|
||||
** author("fg,Gordon@tuoyun.net").
|
||||
** time(2021/5/11 15:37).
|
||||
*/
|
||||
package logic
|
||||
|
||||
import (
|
||||
"Open_IM/src/common/config"
|
||||
"Open_IM/src/common/db/mysql_model/im_mysql_msg_model"
|
||||
kfk "Open_IM/src/common/kafka"
|
||||
"Open_IM/src/common/log"
|
||||
pbMsg "Open_IM/src/proto/chat"
|
||||
"Open_IM/src/utils"
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
type PersistentConsumerHandler struct {
|
||||
msgHandle map[string]fcb
|
||||
persistentConsumerGroup *kfk.MConsumerGroup
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) Init() {
|
||||
pc.msgHandle = make(map[string]fcb)
|
||||
pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
|
||||
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||
|
||||
}
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) {
|
||||
log.InfoByKv("chat come here mysql!!!", "", "chat", string(msg))
|
||||
pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||
err := proto.Unmarshal(msg, &pbData)
|
||||
if err != nil {
|
||||
log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
Options := utils.JsonStringToMap(pbData.Options)
|
||||
//Control whether to store history messages (mysql)
|
||||
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
|
||||
//Only process receiver data
|
||||
if isPersist && msgKey == pbData.RecvID {
|
||||
log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
|
||||
if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
|
||||
log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value))
|
||||
pc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
|
||||
}
|
||||
return nil
|
||||
}
|
14
src/msg_transfer/open_im_msg_transfer.go
Normal file
14
src/msg_transfer/open_im_msg_transfer.go
Normal file
@ -0,0 +1,14 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"Open_IM/src/msg_transfer/logic"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
logic.Init()
|
||||
logic.Run()
|
||||
wg.Wait()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user