This commit is contained in:
truongpx-Mac 2023-11-11 20:25:45 +07:00
parent 355074f800
commit 7e5ca46d2b
17 changed files with 583 additions and 11 deletions

2
.env
View File

@ -162,7 +162,7 @@ KAFKA_ADDRESS=172.28.0.5
# Port on which Kafka distributed streaming platform is running.
# Default: KAFKA_PORT=19092
KAFKA_PORT=19094
KAFKA_PORT=9094
# Topic in Kafka for storing the latest messages in Redis.
# Default: KAFKA_LATESTMSG_REDIS_TOPIC=latestMsgToRedis

BIN
cmd/openim-api/__debug_bin Executable file

Binary file not shown.

View File

@ -124,6 +124,131 @@ const docTemplate = `{
}
}
},
"/user/get_users_online_status": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Get user online status",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Get user online status",
"operationId": "GetUsersOnlineStatus",
"parameters": [
{
"type": "string",
"description": "Operation Id",
"name": "OperationId",
"in": "header",
"required": true
},
{
"description": "Request",
"name": "req",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/msggateway.GetUsersOnlineStatusReq"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/apiresp.ApiResponse"
},
{
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/msggateway.GetUsersOnlineStatusResp_SuccessResult"
}
}
}
}
]
}
},
"400": {
"description": "Errcode is 400, which is generally a parameter input error.",
"schema": {}
},
"500": {
"description": "ERRCODE is 500 generally an internal error of the server",
"schema": {}
}
}
}
},
"/user/set_global_msg_recv_opt": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Set the overall disturbance",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Set the overall disturbance",
"operationId": "SetGlobalRecvMessageOpt",
"parameters": [
{
"type": "string",
"description": "Operation Id",
"name": "OperationId",
"in": "header",
"required": true
},
{
"description": "GlobalRecvmsGopt is the global disturbance setting 0 to turn off 1 to open",
"name": "req",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/user.SetGlobalRecvMessageOptReq"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/user.SetGlobalRecvMessageOptResp"
}
},
"400": {
"description": "Errcode is 400, which is generally a parameter input error.",
"schema": {}
},
"500": {
"description": "ERRCODE is 500 generally an internal error of the server",
"schema": {}
}
}
}
},
"/user/update_user_info": {
"post": {
"security": [
@ -231,6 +356,21 @@ const docTemplate = `{
}
},
"definitions": {
"apiresp.ApiResponse": {
"type": "object",
"properties": {
"data": {},
"errCode": {
"type": "integer"
},
"errDlt": {
"type": "string"
},
"errMsg": {
"type": "string"
}
}
},
"auth.UserTokenReq": {
"type": "object",
"properties": {
@ -256,6 +396,54 @@ const docTemplate = `{
}
}
},
"msggateway.GetUsersOnlineStatusReq": {
"type": "object",
"properties": {
"userIDs": {
"type": "array",
"items": {
"type": "string"
}
}
}
},
"msggateway.GetUsersOnlineStatusResp_SuccessDetail": {
"type": "object",
"properties": {
"connID": {
"type": "string"
},
"isBackground": {
"type": "boolean"
},
"platform": {
"type": "string"
},
"status": {
"type": "string"
},
"token": {
"type": "string"
}
}
},
"msggateway.GetUsersOnlineStatusResp_SuccessResult": {
"type": "object",
"properties": {
"detailPlatformStatus": {
"type": "array",
"items": {
"$ref": "#/definitions/msggateway.GetUsersOnlineStatusResp_SuccessDetail"
}
},
"status": {
"type": "string"
},
"userID": {
"type": "string"
}
}
},
"sdkws.UserInfo": {
"type": "object",
"properties": {
@ -304,6 +492,20 @@ const docTemplate = `{
}
}
},
"user.SetGlobalRecvMessageOptReq": {
"type": "object",
"properties": {
"globalRecvMsgOpt": {
"type": "integer"
},
"userID": {
"type": "string"
}
}
},
"user.SetGlobalRecvMessageOptResp": {
"type": "object"
},
"user.UpdateUserInfoReq": {
"type": "object",
"properties": {

View File

@ -122,6 +122,131 @@
}
}
},
"/user/get_users_online_status": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Get user online status",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Get user online status",
"operationId": "GetUsersOnlineStatus",
"parameters": [
{
"type": "string",
"description": "Operation Id",
"name": "OperationId",
"in": "header",
"required": true
},
{
"description": "Request",
"name": "req",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/msggateway.GetUsersOnlineStatusReq"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/apiresp.ApiResponse"
},
{
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/msggateway.GetUsersOnlineStatusResp_SuccessResult"
}
}
}
}
]
}
},
"400": {
"description": "Errcode is 400, which is generally a parameter input error.",
"schema": {}
},
"500": {
"description": "ERRCODE is 500 generally an internal error of the server",
"schema": {}
}
}
}
},
"/user/set_global_msg_recv_opt": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Set the overall disturbance",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Set the overall disturbance",
"operationId": "SetGlobalRecvMessageOpt",
"parameters": [
{
"type": "string",
"description": "Operation Id",
"name": "OperationId",
"in": "header",
"required": true
},
{
"description": "GlobalRecvmsGopt is the global disturbance setting 0 to turn off 1 to open",
"name": "req",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/user.SetGlobalRecvMessageOptReq"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/user.SetGlobalRecvMessageOptResp"
}
},
"400": {
"description": "Errcode is 400, which is generally a parameter input error.",
"schema": {}
},
"500": {
"description": "ERRCODE is 500 generally an internal error of the server",
"schema": {}
}
}
}
},
"/user/update_user_info": {
"post": {
"security": [
@ -229,6 +354,21 @@
}
},
"definitions": {
"apiresp.ApiResponse": {
"type": "object",
"properties": {
"data": {},
"errCode": {
"type": "integer"
},
"errDlt": {
"type": "string"
},
"errMsg": {
"type": "string"
}
}
},
"auth.UserTokenReq": {
"type": "object",
"properties": {
@ -254,6 +394,54 @@
}
}
},
"msggateway.GetUsersOnlineStatusReq": {
"type": "object",
"properties": {
"userIDs": {
"type": "array",
"items": {
"type": "string"
}
}
}
},
"msggateway.GetUsersOnlineStatusResp_SuccessDetail": {
"type": "object",
"properties": {
"connID": {
"type": "string"
},
"isBackground": {
"type": "boolean"
},
"platform": {
"type": "string"
},
"status": {
"type": "string"
},
"token": {
"type": "string"
}
}
},
"msggateway.GetUsersOnlineStatusResp_SuccessResult": {
"type": "object",
"properties": {
"detailPlatformStatus": {
"type": "array",
"items": {
"$ref": "#/definitions/msggateway.GetUsersOnlineStatusResp_SuccessDetail"
}
},
"status": {
"type": "string"
},
"userID": {
"type": "string"
}
}
},
"sdkws.UserInfo": {
"type": "object",
"properties": {
@ -302,6 +490,20 @@
}
}
},
"user.SetGlobalRecvMessageOptReq": {
"type": "object",
"properties": {
"globalRecvMsgOpt": {
"type": "integer"
},
"userID": {
"type": "string"
}
}
},
"user.SetGlobalRecvMessageOptResp": {
"type": "object"
},
"user.UpdateUserInfoReq": {
"type": "object",
"properties": {

View File

@ -1,5 +1,15 @@
basePath: /
definitions:
apiresp.ApiResponse:
properties:
data: {}
errCode:
type: integer
errDlt:
type: string
errMsg:
type: string
type: object
auth.UserTokenReq:
properties:
platformID:
@ -16,6 +26,37 @@ definitions:
token:
type: string
type: object
msggateway.GetUsersOnlineStatusReq:
properties:
userIDs:
items:
type: string
type: array
type: object
msggateway.GetUsersOnlineStatusResp_SuccessDetail:
properties:
connID:
type: string
isBackground:
type: boolean
platform:
type: string
status:
type: string
token:
type: string
type: object
msggateway.GetUsersOnlineStatusResp_SuccessResult:
properties:
detailPlatformStatus:
items:
$ref: '#/definitions/msggateway.GetUsersOnlineStatusResp_SuccessDetail'
type: array
status:
type: string
userID:
type: string
type: object
sdkws.UserInfo:
properties:
appMangerLevel:
@ -47,6 +88,15 @@ definitions:
$ref: '#/definitions/sdkws.UserInfo'
type: array
type: object
user.SetGlobalRecvMessageOptReq:
properties:
globalRecvMsgOpt:
type: integer
userID:
type: string
type: object
user.SetGlobalRecvMessageOptResp:
type: object
user.UpdateUserInfoReq:
properties:
userInfo:
@ -148,6 +198,86 @@ paths:
summary: Get user information
tags:
- User
/user/get_users_online_status:
post:
consumes:
- application/json
description: Get user online status
operationId: GetUsersOnlineStatus
parameters:
- description: Operation Id
in: header
name: OperationId
required: true
type: string
- description: Request
in: body
name: req
required: true
schema:
$ref: '#/definitions/msggateway.GetUsersOnlineStatusReq'
produces:
- application/json
responses:
"200":
description: OK
schema:
allOf:
- $ref: '#/definitions/apiresp.ApiResponse'
- properties:
data:
items:
$ref: '#/definitions/msggateway.GetUsersOnlineStatusResp_SuccessResult'
type: array
type: object
"400":
description: Errcode is 400, which is generally a parameter input error.
schema: {}
"500":
description: ERRCODE is 500 generally an internal error of the server
schema: {}
security:
- ApiKeyAuth: []
summary: Get user online status
tags:
- User
/user/set_global_msg_recv_opt:
post:
consumes:
- application/json
description: Set the overall disturbance
operationId: SetGlobalRecvMessageOpt
parameters:
- description: Operation Id
in: header
name: OperationId
required: true
type: string
- description: GlobalRecvmsGopt is the global disturbance setting 0 to turn
off 1 to open
in: body
name: req
required: true
schema:
$ref: '#/definitions/user.SetGlobalRecvMessageOptReq'
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/user.SetGlobalRecvMessageOptResp'
"400":
description: Errcode is 400, which is generally a parameter input error.
schema: {}
"500":
description: ERRCODE is 500 generally an internal error of the server
schema: {}
security:
- ApiKeyAuth: []
summary: Set the overall disturbance
tags:
- User
/user/update_user_info:
post:
consumes:

BIN
cmd/openim-msggateway/__debug_bin Executable file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -63,7 +63,8 @@ mysql:
# Maximum connection pool size
mongo:
uri: ''
address: [ 172.28.0.1:37017 ]
address: [ 127.0.0.1:37017 ]
# address: [ 172.28.0.1:37017 ]
database: openIM_v3
username: root
password: openIM123
@ -87,9 +88,14 @@ redis:
# It's not recommended to modify this topic name
# Consumer group ID, it's not recommended to modify
kafka:
# username: 'root'
# password: ''
# username: 'user1'
# password: '123'
username: ''
password: ''
addr: [ 172.28.0.1:19094 ]
# addr: [ 172.28.0.1:19094 ]
addr: [ 127.0.0.1:9094 ]
latestMsgToRedis:
topic: "latestMsgToRedis"
offlineMsgToMongo:
@ -140,10 +146,12 @@ object:
apiURL: "http://127.0.0.1:10002"
minio:
bucket: "openim"
endpoint: "http://172.28.0.1:10005"
# endpoint: "http://172.28.0.1:10005"
endpoint: "http:///127.0.0.1:10005"
accessKeyID: "root"
secretAccessKey: "openIM123"
sessionToken: ''
# signEndpoint: "http://127.0.0.1:10005"
signEndpoint: "http://127.0.0.1:10005"
publicRead: false
cos:
@ -251,8 +259,8 @@ push:
# Built-in app manager user IDs
# Built-in app manager nicknames
manager:
userID: [ "openIM123456", "openIM654321", "openIMAdmin" ]
nickname: [ "system1", "system2", "system3" ]
userID: [ "openIM123456", "openIM654321", "openIMAdmin" ,"tpx"]
nickname: [ "system1", "system2", "system3" ,"truongpx999"]
# Multi-platform login policy
# For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time

View File

@ -107,9 +107,15 @@ services:
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@<your_host>:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://${DOCKER_BRIDGE_GATEWAY}:${KAFKA_PORT}
# - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://${DOCKER_BRIDGE_GATEWAY}:${KAFKA_PORT}
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://127.0.0.1:${KAFKA_PORT}
# KAFKA_ADVERTISED_LISTENERS: INSIDE://127.0.0.1:9092,OUTSIDE://127.0.0.1:9092
# KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# - KAFKA_CLIENT_USERS=user1
# - KAFKA_CLIENT_PASSWORDS=123
networks:
server:
ipv4_address: ${KAFKA_NETWORK_ADDRESS}

View File

@ -69,6 +69,19 @@ func (u *UserApi) UpdateUserInfo(c *gin.Context) {
a2r.Call(user.UserClient.UpdateUserInfo, u.Client, c)
}
// @Summary Set the overall disturbance
// @Description Set the overall disturbance
// @Tags User
// @ID SetGlobalRecvMessageOpt
// @Accept json
// @Param OperationId header string true "Operation Id"
// @Param req body user.SetGlobalRecvMessageOptReq true "GlobalRecvmsGopt is the global disturbance setting 0 to turn off 1 to open"
// @Produce json
// @Success 200 {object} user.SetGlobalRecvMessageOptResp
// @Failure 500 {object} error "ERRCODE is 500 generally an internal error of the server"
// @Failure 400 {object} error "Errcode is 400, which is generally a parameter input error."
// @Security ApiKeyAuth
// @Router /user/set_global_msg_recv_opt [post]
func (u *UserApi) SetGlobalRecvMessageOpt(c *gin.Context) {
a2r.Call(user.UserClient.SetGlobalRecvMessageOpt, u.Client, c)
}
@ -104,7 +117,19 @@ func (u *UserApi) GetUsers(c *gin.Context) {
a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c)
}
// GetUsersOnlineStatus Get user online status.
// @Summary Get user online status
// @Description Get user online status
// @Tags User
// @ID GetUsersOnlineStatus
// @Accept json
// @Param OperationId header string true "Operation Id"
// @Param req body msggateway.GetUsersOnlineStatusReq true "Request"
// @Produce json
// @Success 200 {object} apiresp.ApiResponse{data=[]msggateway.GetUsersOnlineStatusResp_SuccessResult}
// @Failure 500 {object} error "ERRCODE is 500 generally an internal error of the server"
// @Failure 400 {object} error "Errcode is 400, which is generally a parameter input error."
// @Security ApiKeyAuth
// @Router /user/get_users_online_status [post]
func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
var req msggateway.GetUsersOnlineStatusReq
if err := c.BindJSON(&req); err != nil {
@ -236,7 +261,6 @@ func (u *UserApi) UnSubscriberStatus(c *gin.Context) {
a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c)
}
// GetUserStatus Get the online status of the user.
func (u *UserApi) GetUserStatus(c *gin.Context) {
a2r.Call(user.UserClient.GetUserStatus, u.Client, c)
}

View File

@ -135,7 +135,7 @@ func (c *Client) readMessage() {
return
}
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
if c.closed == true { // 连接刚置位已经关闭,但是协程还没退出的场景
if c.closed == true { // Scenes where the connection has been closed, but the coroutine has not exited
c.closedErr = ErrConnClosed
return
}

View File

@ -170,7 +170,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,.
// Get message/notify the storage message list, not store and push the message list,
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
totalMsgs []*ContextMsg,
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {