Merge branch 'v2.3.0release' of github.com:OpenIMSDK/Open-IM-Server into v2.3.0release

This commit is contained in:
wangchuxiao 2023-03-13 16:19:08 +08:00
commit cb1487109a
30 changed files with 843 additions and 574 deletions

View File

@ -111,6 +111,8 @@ func main() {
groupRouterGroup.POST("/get_group_all_member_list", group.GetGroupAllMemberList) //1 groupRouterGroup.POST("/get_group_all_member_list", group.GetGroupAllMemberList) //1
groupRouterGroup.POST("/get_group_members_info", group.GetGroupMembersInfo) //1 groupRouterGroup.POST("/get_group_members_info", group.GetGroupMembersInfo) //1
groupRouterGroup.POST("/invite_user_to_group", group.InviteUserToGroup) //1 groupRouterGroup.POST("/invite_user_to_group", group.InviteUserToGroup) //1
//only for supergroup
groupRouterGroup.POST("/invite_user_to_groups", group.InviteUserToGroups)
groupRouterGroup.POST("/get_joined_group_list", group.GetJoinedGroupList) groupRouterGroup.POST("/get_joined_group_list", group.GetJoinedGroupList)
groupRouterGroup.POST("/dismiss_group", group.DismissGroup) // groupRouterGroup.POST("/dismiss_group", group.DismissGroup) //
groupRouterGroup.POST("/mute_group_member", group.MuteGroupMember) groupRouterGroup.POST("/mute_group_member", group.MuteGroupMember)

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: admin-cms - name: admin-cms
image: openim/admin_cms:v2.3.4 image: openim/admin_cms:v2.3.8
# imagePullPolicy: Always #每次启动都重新拉取镜像 # imagePullPolicy: Always #每次启动都重新拉取镜像
ports: ports:
- containerPort: 10200 - containerPort: 10200
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,11 +30,13 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: api - name: api
image: openim/api:v2.3.4 image: openim/api:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10002 - containerPort: 10002
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,11 +30,12 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: openim-config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新
--- ---

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: auth - name: auth
image: openim/auth:v2.3.4 image: openim/auth:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10160 - containerPort: 10160
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -2,7 +2,7 @@
source ./path_info.cfg source ./path_info.cfg
# images version # images version
version=v2.3.4 version=v2.3.8
git pull git pull
cd ../script/; ./build_all_service.sh cd ../script/; ./build_all_service.sh
cd ../deploy_k8s/ cd ../deploy_k8s/

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: cache - name: cache
image: openim/cache:v2.3.4 image: openim/cache:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10240 - containerPort: 10240
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: openim-config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: openim-usualConfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: cms-api - name: cms-api
image: openim/cms_api:v2.3.4 image: openim/cms_api:v2.3.8
imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- containerPort: 10006 - containerPort: 10006
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,11 +30,12 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新
--- ---

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: conversation - name: conversation
image: openim/conversation:v2.3.4 image: openim/conversation:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10230 - containerPort: 10230
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -34,11 +31,12 @@ spec:
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: demo - name: demo
image: openim/demo:v2.3.4 image: openim/demo:v2.3.8
imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- containerPort: 10004 - containerPort: 10004
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,11 +30,12 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新
--- ---

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: friend - name: friend
image: openim/friend:v2.3.4 image: openim/friend:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10120 - containerPort: 10120
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,11 +30,12 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: group - name: group
image: openim/group:v2.3.4 image: openim/group:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10150 - containerPort: 10150
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -9,18 +9,17 @@
6. 将rpcRegisterIP修改为空, 此地址为每个rpc注册到ETCD的地址, 置空每个rpc将会将pod地址注册到ETCD, 才能正确rpc请求(重要) 6. 将rpcRegisterIP修改为空, 此地址为每个rpc注册到ETCD的地址, 置空每个rpc将会将pod地址注册到ETCD, 才能正确rpc请求(重要)
7. 如果使用minio作为对象存储, 还需要修改minio的地址 7. 如果使用minio作为对象存储, 还需要修改minio的地址
8. 其他如果使用离线推送,需要修改push离线推送配置 8. 其他如果使用离线推送,需要修改push离线推送配置
9. 修改demo中的imAPIURL字段为openIM api的ingress或者service地址, 需要让demo的pod能正确请求到(重要)
10. 其他非必须配置修改, 如短信,推送等
### 2. 项目根目录创建im configMap到k8s openim namespace ### 2. 项目根目录创建im configMap到k8s openim namespace
1. 为open-IM项目创建单独命名空间 1. 为open-IM项目创建单独命名空间
``` ```
kubectl create namespace openim kubectl create namespace openim
``` ```
2. 在项目根目录通过config/config.yaml 2. 修改config.yaml后在项目根目录创建configmap, config/usualConfig.yaml只需要挂载不需要修改配置
``` ```
kubectl -n openim create configmap config --from-file=config/config.yaml kubectl -n openim create configmap openim-config --from-file=config/config.yaml
kubectl -n openim create configmap usualconfig --from-file=config/usualConfig.yaml kubectl -n openim create configmap openim-usualconfig --from-file=config/usualConfig.yaml
``` ```
查看configmap 查看configmap
``` ```

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: msg - name: msg
image: openim/msg:v2.3.4 image: openim/msg:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10130 - containerPort: 10130
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualConfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: msg-gateway - name: msg-gateway
image: openim/msg_gateway:v2.3.4 image: openim/msg_gateway:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- name: rpc-port - name: rpc-port
@ -26,9 +26,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -36,11 +33,12 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新
--- ---

View File

@ -15,15 +15,12 @@ spec:
spec: spec:
containers: containers:
- name: msg-transfer - name: msg-transfer
image: openim/msg_transfer:v2.3.4 image: openim/msg_transfer:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
volumeMounts: volumeMounts:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -31,10 +28,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: office - name: office
image: openim/office:v2.3.4 image: openim/office:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10210 - containerPort: 10210
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: organization - name: organization
image: openim/organization:v2.3.4 image: openim/organization:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10220 - containerPort: 10220
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: push - name: push
image: openim/push:v2.3.4 image: openim/push:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10170 - containerPort: 10170
@ -23,9 +23,6 @@ spec:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualConfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -33,10 +30,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -15,7 +15,7 @@ spec:
spec: spec:
containers: containers:
- name: sdk-server - name: sdk-server
image: openim/sdk_server:v2.3.4 image: openim/sdk_server:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
ports: ports:
- containerPort: 10003 - containerPort: 10003

View File

@ -15,15 +15,12 @@ spec:
spec: spec:
containers: containers:
- name: user - name: user
image: openim/user:v2.3.4 image: openim/user:v2.3.8
# imagePullPolicy: Always # imagePullPolicy: Always
volumeMounts: volumeMounts:
- name: config - name: config
mountPath: /Open-IM-Server/config mountPath: /Open-IM-Server/config
readOnly: true readOnly: true
- name: usualconfig
mountPath: /Open-IM-Server/config
readOnly: true
env: env:
- name: CONFIG_NAME - name: CONFIG_NAME
value: "/Open-IM-Server" value: "/Open-IM-Server"
@ -31,10 +28,11 @@ spec:
value: "/Open-IM-Server" value: "/Open-IM-Server"
volumes: volumes:
- name: config - name: config
configMap: projected:
name: config sources:
- name: usualconfig - configMap:
configMap: name: openim-config
name: usualconfig - configMap:
name: openim-usualconfig
strategy: #更新策略 strategy: #更新策略
type: RollingUpdate # 滚动更新 type: RollingUpdate # 滚动更新

View File

@ -370,6 +370,47 @@ func InviteUserToGroup(c *gin.Context) {
log.NewInfo(req.OperationID, "InviteUserToGroup api return ", resp) log.NewInfo(req.OperationID, "InviteUserToGroup api return ", resp)
c.JSON(http.StatusOK, resp) c.JSON(http.StatusOK, resp)
} }
func InviteUserToGroups(c *gin.Context) {
params := api.InviteUserToGroupsReq{}
if err := c.BindJSON(&params); err != nil {
log.NewError("0", "BindJSON failed ", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
req := &rpc.InviteUserToGroupsReq{}
utils.CopyStructFields(req, &params)
var ok bool
var errInfo string
ok, req.OpUserID, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
if !ok {
errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
log.NewError(req.OperationID, errMsg)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 500, "errMsg": errMsg})
return
}
log.NewInfo(req.OperationID, "InviteUserToGroup args ", req.String())
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
return
}
client := rpc.NewGroupClient(etcdConn)
RpcResp, err := client.InviteUserToGroups(context.Background(), req)
if err != nil {
log.NewError(req.OperationID, "InviteUserToGroup failed ", err.Error(), req.String())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()})
return
}
resp := api.InviteUserToGroupResp{CommResp: api.CommResp{ErrCode: RpcResp.ErrCode, ErrMsg: RpcResp.ErrMsg}}
log.NewInfo(req.OperationID, "InviteUserToGroups api return ", resp)
c.JSON(http.StatusOK, resp)
}
// @Summary 创建群组 // @Summary 创建群组
// @Description 创建群组 // @Description 创建群组

View File

@ -101,47 +101,48 @@ func (r *RPCServer) run() {
} }
} }
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) { func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String()) //log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
var resp []*pbRelay.SingleMsgToUserPlatform //var resp []*pbRelay.SingleMsgToUserPlatform
msgBytes, _ := proto.Marshal(in.MsgData) //msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{ //mReply := Resp{
ReqIdentifier: constant.WSPushMsg, // ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID, // OperationID: in.OperationID,
Data: msgBytes, // Data: msgBytes,
} //}
var replyBytes bytes.Buffer //var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes) //enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply) //err := enc.Encode(mReply)
if err != nil { //if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error()) // log.NewError(in.OperationID, "data encode err", err.Error())
} //}
var tag bool //var tag bool
recvID := in.PushToUserID //recvID := in.PushToUserID
for _, v := range r.platformList { //for _, v := range r.platformList {
if conn := ws.getUserConn(recvID, v); conn != nil { // if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true // tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID) // resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode, // ResultCode: resultCode,
RecvID: recvID, // RecvID: recvID,
RecvPlatFormID: int32(v), // RecvPlatFormID: int32(v),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} else { // } else {
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: -1, // ResultCode: -1,
RecvID: recvID, // RecvID: recvID,
RecvPlatFormID: int32(v), // RecvPlatFormID: int32(v),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} // }
} //}
if !tag { //if !tag {
log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String()) // log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String())
} //}
return &pbRelay.OnlinePushMsgResp{ //return &pbRelay.OnlinePushMsgResp{
Resp: resp, // Resp: resp,
}, nil //}, nil
return nil, nil
} }
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) { func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String()) log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
@ -154,13 +155,13 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult) temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID temp.UserID = userID
userConnMap := ws.getUserAllCons(userID) userConnMap := ws.getUserAllCons(userID)
for platform, userConn := range userConnMap { for platform, userConns := range userConnMap {
if userConn != nil { if len(userConns) != 0 {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail) ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(platform) ps.Platform = constant.PlatformIDToName(platform)
ps.Status = constant.OnlineStatus ps.Status = constant.OnlineStatus
ps.ConnID = userConn.connID ps.ConnID = userConns[0].connID
ps.IsBackground = userConn.IsBackground ps.IsBackground = userConns[0].IsBackground
temp.Status = constant.OnlineStatus temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
} }
@ -196,25 +197,29 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
UserID: v, UserID: v,
} }
userConnMap := ws.getUserAllCons(v) userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap { for platform, userConns := range userConnMap {
if userConn != nil { if len(userConns) != 0 {
temp := &pbRelay.SingleMsgToUserPlatform{ log.NewWarn(req.OperationID, "conns is ", len(userConns), platform, userConns)
RecvID: v, for _, userConn := range userConns {
RecvPlatFormID: int32(platform), temp := &pbRelay.SingleMsgToUserPlatform{
} RecvID: v,
if !userConn.IsBackground { RecvPlatFormID: int32(platform),
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) }
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { if !userConn.IsBackground || req.MsgData.ContentType == constant.SuperGroupUpdateNotification {
tempT.OnlinePush = true resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) tempT.OnlinePush = true
temp.ResultCode = resultCode promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
} else {
temp.ResultCode = -2
resp = append(resp, temp) resp = append(resp, temp)
} }
} else {
temp.ResultCode = -2
resp = append(resp, temp)
} }
} }
} }
tempT.Resp = resp tempT.Resp = resp
@ -247,22 +252,28 @@ func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRel
UserID: v, UserID: v,
} }
userConnMap := ws.getUserAllCons(v) userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap { for platform, userConns := range userConnMap {
if userConn != nil && userConn.IsBackground { if len(userConns) != 0 {
temp := &pbRelay.SingleMsgToUserPlatform{ for _, userConn := range userConns {
RecvID: v, temp := &pbRelay.SingleMsgToUserPlatform{
RecvPlatFormID: int32(platform), RecvID: v,
} RecvPlatFormID: int32(platform),
if constant.PlatformIDToClass(int(userConn.PlatformID)) == constant.TerminalPC || userConn.PlatformID == constant.WebPlatformID { }
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) if !userConn.IsBackground {
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
tempT.OnlinePush = true if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
temp.ResultCode = resultCode log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
} else {
temp.ResultCode = -2
resp = append(resp, temp) resp = append(resp, temp)
} }
} }
} }
} }
tempT.Resp = resp tempT.Resp = resp
@ -274,76 +285,77 @@ func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRel
}, nil }, nil
} }
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) { func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) //log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList //var singleUserResult []*pbRelay.SingelMsgToUserResultList
//
for _, v := range req.PushToUserIDList { //for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform // var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{ // tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v, // UserID: v,
} // }
userConnMap := ws.getUserAllCons(v) // userConnMap := ws.getUserAllCons(v)
var platformList []int // var platformList []int
for k, _ := range userConnMap { // for k, _ := range userConnMap {
platformList = append(platformList, k) // platformList = append(platformList, k)
} // }
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String()) // log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String())
needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) // needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList)
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList)) // log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList))
for platform, list := range needPushMapList { // for platform, list := range needPushMapList {
if list != nil { // if list != nil {
log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:") // log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:")
//for _, v := range list { // //for _, v := range list {
// log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String()) // // log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String())
// req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) // // req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
// log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) // // log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList))
//} // //}
msgBytes, err := proto.Marshal(list) // msgBytes, err := proto.Marshal(list)
if err != nil { // if err != nil {
log.Error(req.OperationID, "proto marshal err", err.Error()) // log.Error(req.OperationID, "proto marshal err", err.Error())
continue // continue
} // }
req.MsgData.MsgDataList = msgBytes // req.MsgData.MsgDataList = msgBytes
//req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) // //req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
log.Debug(req.OperationID, "r.encodeWsData no string") // log.Debug(req.OperationID, "r.encodeWsData no string")
//log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String()) // //log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String())
//
log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) // log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String())
replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) // replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID)
if err != nil { // if err != nil {
log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) // log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String())
continue // continue
} // }
log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len()) // log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len())
resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) // resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { // if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true // tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v) // log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode, // ResultCode: resultCode,
RecvID: v, // RecvID: v,
RecvPlatFormID: int32(platform), // RecvPlatFormID: int32(platform),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} // }
} else { // } else {
if utils.IsContainInt(platform, r.pushTerminal) { // if utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true // tempT.OnlinePush = true
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: 0, // ResultCode: 0,
RecvID: v, // RecvID: v,
RecvPlatFormID: int32(platform), // RecvPlatFormID: int32(platform),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} // }
} // }
} // }
tempT.Resp = resp // tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT) // singleUserResult = append(singleUserResult, tempT)
} //}
return &pbRelay.OnlineBatchPushOneMsgResp{ //return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult, // SinglePushResult: singleUserResult,
}, nil //}, nil
return nil, nil
} }
func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) { func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) {
log.Debug(operationID, "encodeWsData begin", wsData.String()) log.Debug(operationID, "encodeWsData begin", wsData.String())
@ -374,10 +386,11 @@ func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOffl
log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID) log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID)
SetTokenKicked(v, int(req.PlatformID), req.OperationID) SetTokenKicked(v, int(req.PlatformID), req.OperationID)
oldConnMap := ws.getUserAllCons(v) oldConnMap := ws.getUserAllCons(v)
if conn, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn] if conns, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn]
log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v) log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v)
ws.sendKickMsg(conn, req.OperationID) for _, conn := range conns {
conn.Close() ws.sendKickMsg(conn, req.OperationID)
}
} }
} }
return &pbRelay.KickUserOfflineResp{}, nil return &pbRelay.KickUserOfflineResp{}, nil

View File

@ -45,15 +45,13 @@ type WServer struct {
wsAddr string wsAddr string
wsMaxConnNum int wsMaxConnNum int
wsUpGrader *websocket.Upgrader wsUpGrader *websocket.Upgrader
wsConnToUser map[*UserConn]map[int]string wsUserToConn map[string]map[int][]*UserConn
wsUserToConn map[string]map[int]*UserConn
} }
func (ws *WServer) onInit(wsPort int) { func (ws *WServer) onInit(wsPort int) {
ws.wsAddr = ":" + utils.IntToString(wsPort) ws.wsAddr = ":" + utils.IntToString(wsPort)
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
ws.wsConnToUser = make(map[*UserConn]map[int]string) ws.wsUserToConn = make(map[string]map[int][]*UserConn)
ws.wsUserToConn = make(map[string]map[int]*UserConn)
ws.wsUpGrader = &websocket.Upgrader{ ws.wsUpGrader = &websocket.Upgrader{
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
@ -203,8 +201,11 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int,
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok { if oldConns, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn") log.NewDebug(operationID, uid, platformID, "kick old conn")
for _, conn := range oldConns {
ws.sendKickMsg(conn, operationID)
}
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil { if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
@ -227,16 +228,12 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int,
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return return
} }
err = oldConn.Close()
//delete(oldConnMap, platformID) delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 { if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid) delete(ws.wsUserToConn, uid)
} }
delete(ws.wsConnToUser, oldConn)
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
} else { } else {
log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID]) log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID])
} }
@ -259,9 +256,11 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok { if oldConns, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn") log.NewDebug(operationID, uid, platformID, "kick old conn")
ws.sendKickMsg(oldConn, operationID) for _, conn := range oldConns {
ws.sendKickMsg(conn, operationID)
}
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil { if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
@ -284,16 +283,11 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return return
} }
err = oldConn.Close()
delete(oldConnMap, platformID) delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 { if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid) delete(ws.wsUserToConn, uid)
} }
delete(ws.wsConnToUser, oldConn)
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
callbackResp := callbackUserKickOff(operationID, uid, platformID) callbackResp := callbackUserKickOff(operationID, uid, platformID)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
@ -328,6 +322,11 @@ func (ws *WServer) sendKickMsg(oldConn *UserConn, operationID string) {
if err != nil { if err != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), err.Error()) log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), err.Error())
} }
errClose := oldConn.Close()
if errClose != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "close old conn error", oldConn.RemoteAddr().String(), err.Error())
}
} }
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) { func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) {
@ -341,23 +340,24 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token
go ws.MultiTerminalLoginRemoteChecker(uid, int32(platformID), token, operationID) go ws.MultiTerminalLoginRemoteChecker(uid, int32(platformID), token, operationID)
ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID) ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID)
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { if oldConnMap, ok := ws.wsUserToConn[uid]; ok {
oldConnMap[platformID] = conn if conns, ok := oldConnMap[platformID]; ok {
conns = append(conns, conn)
oldConnMap[platformID] = conns
} else {
var conns []*UserConn
conns = append(conns, conn)
oldConnMap[platformID] = conns
}
ws.wsUserToConn[uid] = oldConnMap ws.wsUserToConn[uid] = oldConnMap
log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap) log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap)
} else { } else {
i := make(map[int]*UserConn) i := make(map[int][]*UserConn)
i[platformID] = conn var conns []*UserConn
conns = append(conns, conn)
i[platformID] = conns
ws.wsUserToConn[uid] = i ws.wsUserToConn[uid] = i
log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid]) log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid])
} }
if oldStringMap, ok := ws.wsConnToUser[conn]; ok {
oldStringMap[platformID] = uid
ws.wsConnToUser[conn] = oldStringMap
} else {
i := make(map[int]string)
i[platformID] = uid
ws.wsConnToUser[conn] = i
}
count := 0 count := 0
for _, v := range ws.wsUserToConn { for _, v := range ws.wsUserToConn {
count = count + len(v) count = count + len(v)
@ -370,32 +370,40 @@ func (ws *WServer) delUserConn(conn *UserConn) {
rwLock.Lock() rwLock.Lock()
defer rwLock.Unlock() defer rwLock.Unlock()
operationID := utils.OperationIDGenerator() operationID := utils.OperationIDGenerator()
var uid string platform := int(conn.PlatformID)
var platform int
if oldStringMap, okg := ws.wsConnToUser[conn]; okg { if oldConnMap, ok := ws.wsUserToConn[conn.userID]; ok { // only recycle self conn
for k, v := range oldStringMap { if oldconns, okMap := oldConnMap[platform]; okMap {
platform = k
uid = v var a []*UserConn
}
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { for _, client := range oldconns {
delete(oldConnMap, platform) if client != conn {
ws.wsUserToConn[uid] = oldConnMap a = append(a, client)
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid) }
} }
count := 0 if len(a) != 0 {
for _, v := range ws.wsUserToConn { oldConnMap[platform] = a
count = count + len(v) } else {
delete(oldConnMap, platform)
} }
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} else {
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
} }
delete(ws.wsConnToUser, conn) ws.wsUserToConn[conn.userID] = oldConnMap
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, conn.userID)
}
count := 0
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", conn.userID, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} }
err := conn.Close() err := conn.Close()
if err != nil { if err != nil {
log.Error(operationID, " close err", "", "uid", uid, "platform", platform) log.Error(operationID, " close err", "", "uid", conn.userID, "platform", platform)
} }
if conn.PlatformID == 0 || conn.connID == "" { if conn.PlatformID == 0 || conn.connID == "" {
log.NewWarn(operationID, utils.GetSelfFuncName(), "PlatformID or connID is null", conn.PlatformID, conn.connID) log.NewWarn(operationID, utils.GetSelfFuncName(), "PlatformID or connID is null", conn.PlatformID, conn.connID)
@ -408,21 +416,21 @@ func (ws *WServer) delUserConn(conn *UserConn) {
} }
func (ws *WServer) getUserConn(uid string, platform int) *UserConn { // func (ws *WServer) getUserConn(uid string, platform int) *UserConn {
// rwLock.RLock()
// defer rwLock.RUnlock()
// if connMap, ok := ws.wsUserToConn[uid]; ok {
// if conn, flag := connMap[platform]; flag {
// return conn
// }
// }
// return nil
// }
func (ws *WServer) getUserAllCons(uid string) map[int][]*UserConn {
rwLock.RLock() rwLock.RLock()
defer rwLock.RUnlock() defer rwLock.RUnlock()
if connMap, ok := ws.wsUserToConn[uid]; ok { if connMap, ok := ws.wsUserToConn[uid]; ok {
if conn, flag := connMap[platform]; flag { newConnMap := make(map[int][]*UserConn)
return conn
}
}
return nil
}
func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn {
rwLock.RLock()
defer rwLock.RUnlock()
if connMap, ok := ws.wsUserToConn[uid]; ok {
newConnMap := make(map[int]*UserConn)
for k, v := range connMap { for k, v := range connMap {
newConnMap[k] = v newConnMap[k] = v
} }

View File

@ -564,6 +564,35 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
return &resp, nil return &resp, nil
} }
func (s *groupServer) InviteUserToGroups(ctx context.Context, req *pbGroup.InviteUserToGroupsReq) (*pbGroup.InviteUserToGroupsResp, error) {
if !token_verify.IsManagerUserID(req.OpUserID) {
log.NewError(req.OperationID, "no permission InviteUserToGroup ", req.String())
return &pbGroup.InviteUserToGroupsResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
for _, v := range req.GroupIDList {
groupInfo, err := imdb.GetGroupInfoByGroupID(v)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", v, err)
return &pbGroup.InviteUserToGroupsResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error() + v}, nil
}
if groupInfo.Status == constant.GroupStatusDismissed {
errMsg := " group status is dismissed " + v
return &pbGroup.InviteUserToGroupsResp{ErrCode: constant.ErrStatus.ErrCode, ErrMsg: errMsg}, nil
}
}
if err := db.DB.AddUserToSuperGroups(req.GroupIDList, req.InvitedUserID); err != nil {
log.NewError(req.OperationID, "AddUserToSuperGroups failed ", err.Error())
return &pbGroup.InviteUserToGroupsResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil
}
if err := rocksCache.DelJoinedSuperGroupIDListFromCache(req.InvitedUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
}
chat.SuperGroupNotification(req.OperationID, req.InvitedUserID, req.InvitedUserID)
log.NewInfo(req.OperationID, "InviteUserToGroups rpc return ")
return nil, nil
}
func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) { func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) {
log.NewInfo(req.OperationID, "GetGroupAllMember, args ", req.String()) log.NewInfo(req.OperationID, "GetGroupAllMember, args ", req.String())
var resp pbGroup.GetGroupAllMemberResp var resp pbGroup.GetGroupAllMemberResp

View File

@ -47,7 +47,15 @@ type InviteUserToGroupResp struct {
CommResp CommResp
UserIDResultList []*UserIDResult `json:"data"` UserIDResultList []*UserIDResult `json:"data"`
} }
type InviteUserToGroupsReq struct {
GroupIDList string `json:"groupIDList" binding:"required"`
InvitedUserID string `json:"invitedUserID" binding:"required"`
Reason string `json:"reason"`
OperationID string `json:"operationID" binding:"required"`
}
type InviteUserToGroupsResp struct {
CommResp
}
type GetJoinedGroupListReq struct { type GetJoinedGroupListReq struct {
OperationID string `json:"operationID" binding:"required"` OperationID string `json:"operationID" binding:"required"`
FromUserID string `json:"fromUserID" binding:"required"` FromUserID string `json:"fromUserID" binding:"required"`

View File

@ -361,4 +361,4 @@ const StatisticsTimeInterval = 60
const MaxNotificationNum = 500 const MaxNotificationNum = 500
const CurrentVersion = "v2.3.4-rc0" const CurrentVersion = "v2.3.8-rc0"

View File

@ -112,6 +112,10 @@ func init() {
if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil { if err := createMongoIndex(mongoClient, cTag, true, "tag_id"); err != nil {
panic(err.Error() + "index create failed " + cTag + " tag_id") panic(err.Error() + "index create failed " + cTag + " tag_id")
} }
if err := createMongoIndex(mongoClient, cUserToSuperGroup, true, "user_id"); err != nil {
panic(err.Error() + "index create failed " + cUserToSuperGroup + " user_id")
}
DB.mongoClient = mongoClient DB.mongoClient = mongoClient
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

View File

@ -1233,6 +1233,36 @@ func (d *DataBases) AddUserToSuperGroup(groupID string, userIDList []string) err
_ = session.CommitTransaction(ctx) _ = session.CommitTransaction(ctx)
return err return err
} }
func (d *DataBases) AddUserToSuperGroups(groupIDList []string, userID string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup)
session, err := d.mongoClient.StartSession()
if err != nil {
return utils.Wrap(err, "start session failed")
}
defer session.EndSession(ctx)
sCtx := mongo.NewSessionContext(ctx, session)
if err != nil {
return utils.Wrap(err, "start transaction failed")
}
_, err = c.UpdateMany(sCtx, bson.M{"group_id": bson.M{"$in": groupIDList}}, bson.M{"$addToSet": bson.M{"member_id_list": userID}})
if err != nil {
_ = session.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup)
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
_, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": bson.M{"$each": groupIDList}}}, opts)
if err != nil {
_ = session.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
_ = session.CommitTransaction(ctx)
return err
}
func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error { func (d *DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)

File diff suppressed because it is too large Load Diff

View File

@ -200,6 +200,17 @@ message InviteUserToGroupResp {
repeated Id2Result Id2ResultList = 3; // 0 ok, -1 error repeated Id2Result Id2ResultList = 3; // 0 ok, -1 error
} }
message InviteUserToGroupsReq {
string OperationID = 1;
repeated string groupIDList = 2;
string Reason = 3;
string invitedUserID = 4;
string OpUserID = 5; //group member or app manager
}
message InviteUserToGroupsResp {
int32 ErrCode = 1;
string ErrMsg = 2;
}
message GetGroupAllMemberReq { message GetGroupAllMemberReq {
string GroupID = 1; string GroupID = 1;
@ -413,6 +424,7 @@ service group{
rpc kickGroupMember(KickGroupMemberReq) returns (KickGroupMemberResp); rpc kickGroupMember(KickGroupMemberReq) returns (KickGroupMemberResp);
rpc getJoinedGroupList(GetJoinedGroupListReq) returns (GetJoinedGroupListResp); rpc getJoinedGroupList(GetJoinedGroupListReq) returns (GetJoinedGroupListResp);
rpc inviteUserToGroup(InviteUserToGroupReq) returns (InviteUserToGroupResp); rpc inviteUserToGroup(InviteUserToGroupReq) returns (InviteUserToGroupResp);
rpc inviteUserToGroups(InviteUserToGroupsReq) returns (InviteUserToGroupsResp);
rpc getGroupAllMember(GetGroupAllMemberReq) returns(GetGroupAllMemberResp); rpc getGroupAllMember(GetGroupAllMemberReq) returns(GetGroupAllMemberResp);
rpc GetGroups(GetGroupsReq) returns(GetGroupsResp); rpc GetGroups(GetGroupsReq) returns(GetGroupsResp);